Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assembly/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export * from "./net/udp";
export * from "./net/util";
export * from "./process";
export * from "./process/bindings";
export * from "./process/supervisor";
export * from "./registry"
export * from "./registry/bindings";
export * from "./util";
75 changes: 43 additions & 32 deletions assembly/managed/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,110 +2,121 @@ import { Mailbox } from "../message";
import { MessageType } from "../message/util";
import { Process } from "../process";

/** Represents a shared map using the std lib hashmap. */
export class SharedMap<TValue> {
private self: Process<SharedMapEvent<TValue>> = Process.inheritSpawn((mb: Mailbox<SharedMapEvent<TValue>>) => {

/** The process that runs the hashmap. */
private self: Process<SharedMapEvent> = Process.inheritSpawn((mb: Mailbox<SharedMapEvent>) => {
let map = new Map<string, TValue>();
while (true) {
let message = mb.receive();
if (message.type == MessageType.Data) {
let event = message.box!.value;
if (event instanceof GetSharedMapEvent<TValue>) {
let key = (<GetSharedMapEvent<TValue>>event).key;
if (event instanceof GetSharedMapEvent) {
let key = (<GetSharedMapEvent>event).key;
message.reply<TValue>(map.get(key));
} else if (event instanceof SetSharedMapEvent<TValue>) {
let key = (<SetSharedMapEvent<TValue>>event).key;
let value = (<SetSharedMapEvent<TValue>>event).value;
map.set(key, value);
} else if (event instanceof DeleteSharedMapEvent<TValue>) {
let key = (<DeleteSharedMapEvent<TValue>>event).key;
} else if (event instanceof DeleteSharedMapEvent) {
let key = (<DeleteSharedMapEvent>event).key;
map.delete(key);
} else if (event instanceof ClearSharedMapEvent<TValue>) {
} else if (event instanceof ClearSharedMapEvent) {
map.clear();
} else if (event instanceof HasSharedMapEvent<TValue>) {
const key = (<HasSharedMapEvent<TValue>>event).key;
} else if (event instanceof HasSharedMapEvent) {
const key = (<HasSharedMapEvent>event).key;
message.reply<bool>(map.has(key));
} else if (event instanceof SizeSharedMapEvent<TValue>) {
} else if (event instanceof SizeSharedMapEvent) {
message.reply<i32>(map.size);
} else if (event instanceof KeysSharedMapEvent<TValue>) {
} else if (event instanceof KeysSharedMapEvent) {
message.reply<string[]>(map.keys());
} else if (event instanceof ValuesSharedMapEvent<TValue>) {
} else if (event instanceof ValuesSharedMapEvent) {
message.reply<TValue[]>(map.values());
}
}
}
}).expect();

/** Get a value based on the given key. */
get(key: string): TValue {
let event = new GetSharedMapEvent<TValue>(key);
let message = this.self.request<GetSharedMapEvent<TValue>, TValue>(event);
let event = new GetSharedMapEvent(key);
let message = this.self.request<GetSharedMapEvent, TValue>(event);
assert(message.type == MessageType.Data);
return message.box!.value;
}

/** Set a value with a given key. */
set(key: string, value: TValue): this {
let event = new SetSharedMapEvent<TValue>(key, value);
this.self.send(event);
return this;
}

/** Delete a key. */
delete(key: string): void {
let event = new DeleteSharedMapEvent<TValue>(key);
let event = new DeleteSharedMapEvent(key);
this.self.send(event);
}

/** Clear the hashmap. */
clear(): void {
const event = new ClearSharedMapEvent<TValue>();
const event = new ClearSharedMapEvent();
this.self.send(event);
}

/** Check to see if it has the key. */
has(key: string): bool {
const event = new HasSharedMapEvent<TValue>(key);
const message = this.self.request<HasSharedMapEvent<TValue>, bool>(event);
const event = new HasSharedMapEvent(key);
const message = this.self.request<HasSharedMapEvent, bool>(event);
assert(message.type == MessageType.Data);
return message.box!.value;
}

/** Get the size of the hashmap. */
get size(): i32 {
const event = new SizeSharedMapEvent<TValue>();
const message = this.self.request<SizeSharedMapEvent<TValue>, i32>(event);
const event = new SizeSharedMapEvent();
const message = this.self.request<SizeSharedMapEvent, i32>(event);
assert(message.type == MessageType.Data);
return message.box!.value;
}

/** Get all the keys in the hashmap. */
keys(): string[] {
const event = new KeysSharedMapEvent<TValue>();
const message = this.self.request<KeysSharedMapEvent<TValue>, string[]>(event);
const event = new KeysSharedMapEvent();
const message = this.self.request<KeysSharedMapEvent, string[]>(event);
assert(message.type == MessageType.Data);
return message.box!.value;
}

/** Get all the values in the hashmap. */
values(): TValue[] {
const event = new ValuesSharedMapEvent<TValue>();
const message = this.self.request<ValuesSharedMapEvent<TValue>, TValue[]>(event);
const event = new ValuesSharedMapEvent();
const message = this.self.request<ValuesSharedMapEvent, TValue[]>(event);
assert(message.type == MessageType.Data);
return message.box!.value;
}
}

abstract class SharedMapEvent<TValue> {}
abstract class SharedMapEvent {}

class GetSharedMapEvent<TValue> extends SharedMapEvent<TValue> {
class GetSharedMapEvent extends SharedMapEvent {
constructor(
public key: string
) {
super();
}
}

class DeleteSharedMapEvent<TValue> extends SharedMapEvent<TValue> {
class DeleteSharedMapEvent extends SharedMapEvent {
constructor(
public key: string
) {
super();
}
}

class SetSharedMapEvent<TValue> extends SharedMapEvent<TValue> {
class SetSharedMapEvent<TValue> extends SharedMapEvent {
constructor(
public key: string,
public value: TValue,
Expand All @@ -114,16 +125,16 @@ class SetSharedMapEvent<TValue> extends SharedMapEvent<TValue> {
}
}

class ClearSharedMapEvent<TValue> extends SharedMapEvent<TValue> {}
class ClearSharedMapEvent extends SharedMapEvent {}

class HasSharedMapEvent<TValue> extends SharedMapEvent<TValue> {
class HasSharedMapEvent extends SharedMapEvent {
constructor(public key: string) {
super();
}
}

class SizeSharedMapEvent<TValue> extends SharedMapEvent<TValue> {}
class SizeSharedMapEvent extends SharedMapEvent {}

class KeysSharedMapEvent<TValue> extends SharedMapEvent<TValue> {}
class KeysSharedMapEvent extends SharedMapEvent {}

class ValuesSharedMapEvent<TValue> extends SharedMapEvent<TValue> {}
class ValuesSharedMapEvent extends SharedMapEvent {}
192 changes: 192 additions & 0 deletions assembly/process/supervisor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { Process } from ".";
import { Mailbox, Message } from "../message";
import { MessageType } from "../message/util";

/** This class represents an abstract Supervisor Event that the supervisor must handle. */
export abstract class SupervisorEvent<TStart, TMessage> {
abstract handle(
ctx: HandleSupervisorContext<TStart, TMessage>,
message: Message<SupervisorEvent<TStart, TMessage>>,
): void;
}

/**
* This class represents an intention to spawn a single process. It requires a start value,
* and has a handle method that is called by the handleSupervisor process.
*/
export class SupervisorSpawnMessage<TStart, TMessage> extends SupervisorEvent<TStart, TMessage> {
constructor(
public start: TStart,
) {
super();
}

handle(ctx: HandleSupervisorContext<TStart, TMessage>, message: Message<SupervisorEvent<TStart, TMessage>>): void {

// spawn the process
let process = Process.inheritSpawnWith(this.start, ctx.supervisorContext.spawnProcess)
.expect("Could not spawn process");

// then reply back to the caller that the process was created
message.reply<Process<TMessage>>(process);
let tag = Process.link(process);
ctx.processes.set(tag, new ProcessState<TStart, TMessage>(this.start, process));
ctx.processList.push(process);

// and if there's no delegate, delegate it
if (ctx.delegate == null) {
ctx.supervisorContext.onDelegate(process);
ctx.delegate = process;
}
}
}

/**
* This class represents an intention to send a message to one of the child processes. This
* behavior is configured via the supervisor constructor.
*/
export class SendSupervisorMessage<TStart, TMessage> extends SupervisorEvent<TStart, TMessage> {
constructor(
public message: TMessage
) {
super();
}

handle(ctx: HandleSupervisorContext<TStart, TMessage>, _message: Message<SupervisorEvent<TStart, TMessage>>): void {
ctx.supervisorContext.onSend(ctx, this.message);
}
}

/** This is a helper class for the supervisor process. */
export class SupervisorContext<TStart, TMessage> {

constructor(
public onDelegate: (process: Process<TMessage>) => void,
public spawnProcess: (start: TStart, mb: Mailbox<TMessage>) => void,
public onSend: (ctx: HandleSupervisorContext<TStart, TMessage>, message: TMessage) => void,
) {}
}

/** This is the wrapper class that represents a supervisor process. */
export class Supervisor<TStart, TMessage> {
self: Process<SupervisorEvent<TStart, TMessage>>;

constructor(
onDelegate: (process: Process<TMessage>) => void,
spawnProcess: (start: TStart, mb: Mailbox<TMessage>) => void,
/** This callback is responsible for actually sending the messages to the processes in the process list. */
onSend: (ctx: HandleSupervisorContext<TStart, TMessage>, message: TMessage) => void
= (ctx: HandleSupervisorContext<TStart, TMessage>, message: TMessage) => {

// round robin
let length = ctx.processList.length;
let index = ctx.roundRobinIndex;
index++;
if (index >= length) {
index = 0;
}
ctx.processList[index].send(message);
ctx.roundRobinIndex = index;
},
) {
// spawn the process
let context = new SupervisorContext<TStart, TMessage>(onDelegate, spawnProcess, onSend);
this.self = Process.inheritSpawnWith<
SupervisorContext<TStart, TMessage>,
SupervisorEvent<TStart, TMessage>
> (
context,
handleSupervisor
)
.expect("Could not spawn supervisor.");
}

/**
* Spawn a process that will be supervised and restarted with the same start value
* each time the process fails.
*/
spawn(startValue: TStart): Process<TMessage> {
// request the process from the supervisor handler
let message = this.self
.request<SupervisorEvent<TStart, TMessage>, Process<TMessage>>(
new SupervisorSpawnMessage<TStart, TMessage>(startValue)
);
assert(message.type == MessageType.Data);
return message.unbox();
}

/** Send a message to the child processes. */
send(message: TMessage): void {
this.self.send(new SendSupervisorMessage<TStart, TMessage>(message));
}
}
/** This is a helper class for the supervisor process. */
export class ProcessState<TStart, TMessage> {
constructor(
public startValue: TStart,
public process: Process<TMessage>,
) {}
}

/** This is a helper class for the handle supervisor process. */
export class HandleSupervisorContext<TStart, TMessage> {
public roundRobinIndex: i32 = 0;

constructor(
/** A map of processes that exist based on their tag. */
public processes: Map<u64, ProcessState<TStart, TMessage>>,
/** A list of all the processes. */
public processList: Process<TMessage>[],
/** The delegated process. */
public delegate: Process<TMessage> | null,
/** The supervisor context that contains all the callback events. */
public supervisorContext: SupervisorContext<TStart, TMessage>,
) {}
}

/** This method is the entry point for a given supervisor. */
export function handleSupervisor<TStart, TMessage>(
context: SupervisorContext<TStart, TMessage>,
mb: Mailbox<SupervisorEvent<TStart, TMessage>>,
): void {
let handleSupervisorContext = new HandleSupervisorContext<TStart, TMessage>(
new Map<u64, ProcessState<TStart, TMessage>>(),
[],
null,
context,
);

// process loop
while (true) {
let message = mb.receive();

if (message.type == MessageType.Data) {
// handle supervisor requests here
let event = message.unbox();
event.handle(handleSupervisorContext, message);
} else if (message.type == MessageType.Signal) {
// a process died,
let tag = message.tag;

// get the old process by it's tag
let processContext = handleSupervisorContext.processes.get(tag);
let childProcess = Process.inheritSpawnWith<TStart, TMessage>(
processContext.startValue,
context.spawnProcess,
)
.expect();

// delete the old process
handleSupervisorContext.processes.delete(tag);
let index = handleSupervisorContext.processList.indexOf(childProcess);
if (index != -1) {
handleSupervisorContext.processList.splice(index, 1);
}

// create a new process and set it to the old process context
tag = Process.link(childProcess);
processContext.process = childProcess;
handleSupervisorContext.processes.set(tag, processContext);
}
}
}
2 changes: 1 addition & 1 deletion assembly/process/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ export class StartWrapper<TStart> {
public start: TStart,
public index: usize,
) {}
}
}
Loading