Skip to content

Commit 71125ab

Browse files
committed
- using resolver
- testing move to json configs
1 parent 3b6f652 commit 71125ab

2 files changed

Lines changed: 185 additions & 75 deletions

File tree

zalfmas_common/common.py

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -98,37 +98,55 @@ def update_config(config, argv, print_config=False, allow_new_keys=False):
9898
print(config)
9999

100100

101-
def create_service_toml_config(header_comment: str = None, id = None, name=None, description=None,
102-
host=None, port=None, serve_bootstrap=None, sturdy_ref_token=None,
103-
reg_sturdy_ref=None, reg_category=None, **kwargs) -> tk.TOMLDocument:
104-
config = tk.document()
105-
config.add(tk.comment(header_comment if header_comment else "MAS service configuration file"))
106-
config.add(tk.nl())
107-
101+
def create_service_toml_config(header_comment: str = None,
102+
id = None, name=None, description=None,
103+
host=None, port=None, serve_bootstrap=None,
104+
fixed_sturdy_ref_token=None,
105+
reg_sturdy_ref=None, reg_category=None,
106+
resolver_sturdy_ref=None, resolver_alias=None) -> tk.TOMLDocument:
107+
doc = tk.document()
108+
doc.add(tk.comment(header_comment if header_comment else "MAS service configuration file"))
109+
doc.add(tk.nl())
110+
111+
services = tk.aot()
108112
service = tk.table()
109113
service["id"] = id if id else str(uuid.uuid4())
110114
service["name"] = name if name else f"Service {service['id']}"
111115
service["description"] = description if description else f"No description for service {service['id']}"
112-
config.add("service", service)
113-
114-
caps = tk.table()
115-
caps["fixed_sturdy_ref_token"] = sturdy_ref_token if sturdy_ref_token else "a_sturdy_ref_token"
116-
register_at = tk.aot()
117-
register_at.append(tk.item({
118-
"sturdy_ref": reg_sturdy_ref if reg_sturdy_ref else "capnp://the_host_key@host:port/a_sturdy_ref_token",
119-
"category": reg_category if reg_category else "a registry category",
120-
}))
121-
config.add("register_at", register_at)
122-
config.add("caps", caps)
123-
124-
network = tk.table()
125-
network["host"] = host if host else "localhost"
126-
network["port"] = port if port else 0
127-
network["serve_bootstrap"] = serve_bootstrap if serve_bootstrap else True
128-
config.add("network", network)
129-
130-
return config
131-
116+
service["fixed_sturdy_ref_token"] = fixed_sturdy_ref_token if fixed_sturdy_ref_token else "a_sturdy_ref_token"
117+
services.append(service)
118+
119+
regs = tk.aot()
120+
regs.name = "services.registries"
121+
reg = tk.table()
122+
reg["sturdy_ref"] = reg_sturdy_ref if reg_sturdy_ref else "capnp://the_host_key@host:port/a_sturdy_ref_token"
123+
reg["category"] = reg_category if reg_category else "a registry category"
124+
regs.append(reg)
125+
126+
doc.add("services", services)
127+
128+
resolvers = tk.aot()
129+
resolvers.name = "resolvers"
130+
res = tk.table()
131+
res["alias"] = resolver_alias if resolver_alias else "some_alias_for_the_resolver"
132+
res["resolver_sturdy_ref"] = reg_category if reg_category else "capnp://the_host_key@host:port/a_sturdy_ref_token"
133+
resolvers.append(res)
134+
135+
# register_at = tk.aot()
136+
# register_at.append(tk.item({
137+
# "sturdy_ref": reg_sturdy_ref if reg_sturdy_ref else "capnp://the_host_key@host:port/a_sturdy_ref_token",
138+
# "category": reg_category if reg_category else "a registry category",
139+
# }))
140+
# config.add("register_at", register_at)
141+
# config.add("caps", caps)
142+
143+
vat = tk.table()
144+
vat["host"] = host if host else "localhost"
145+
vat["port"] = port if port else 0
146+
vat["serve_bootstrap"] = serve_bootstrap if serve_bootstrap else True
147+
doc.add("vat", vat)
148+
149+
return doc
132150

133151

134152
# def sign_sr_token_by_sk_and_encode_base64(self, sr_token):
@@ -194,6 +212,15 @@ def set_vat_id_from_sign_pk(self):
194212
int.from_bytes(self._sign_pk[24:32], byteorder=sys.byteorder, signed=False),
195213
]
196214

215+
@property
216+
def base64_vat_id(self):
217+
return base64.urlsafe_b64encode(self._sign_pk).decode("utf-8")
218+
219+
def signature_of_vat_id(self):
220+
public_key_bytes = base64.urlsafe_b64encode(self._sign_pk)
221+
signature = pysodium.crypto_sign_detached(public_key_bytes, self._sign_sk)
222+
return signature
223+
197224
@property
198225
def storage_container(self):
199226
return self._storage_container
@@ -574,6 +601,10 @@ def __init__(self, restorer=None):
574601
self._connections = {}
575602
self._restorer = restorer if restorer else Restorer()
576603

604+
@property
605+
def restorer(self):
606+
return self._restorer
607+
577608
async def connect(self, sturdy_ref, cast_as=None):
578609
if not sturdy_ref:
579610
return None

zalfmas_common/service.py

Lines changed: 127 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import tomlkit as tk
2121
from zalfmas_common import common
2222
import zalfmas_capnp_schemas
23+
from zalfmas_common.common import ConnectionManager
24+
2325
sys.path.append(os.path.dirname(zalfmas_capnp_schemas.__file__))
2426
import common_capnp
2527
import fbp_capnp
@@ -110,71 +112,98 @@ async def updateIdentity(self, oldId, newInfo, **kwargs): # updateIdentity @4 (
110112
s.description = newInfo.description
111113

112114

113-
async def init_and_run_service(name_to_service, host=None, port=0, serve_bootstrap=True, restorer=None,
114-
conn_man=None, name_to_service_srs=None, run_before_enter_eventloop=None,
115-
restorer_container_sr=None, read_from_stdin=False, **kwargs):
116-
port = port if port else 0
115+
async def register_services(con_man: ConnectionManager,
116+
name_to_service: dict,
117+
admin: Admin,
118+
registries: list[dict]):
119+
for name, cap in name_to_service.items():
120+
for reg in registries:
121+
try:
122+
reg_sr = data["sturdy_ref"]
123+
reg_name = data.get("name", "")
124+
reg_cat_id = data.get("category_id", "")
125+
print("Trying to register service with name:", reg_name, "@ category:", reg_cat_id)
126+
registrar = await con_man.try_connect(reg_sr, cast_as=reg_capnp.Registrar)
127+
if registrar:
128+
r = await registrar.register(cap=cap, regName=reg_name, categoryId=reg_cat_id)
129+
unreg_action = r.unreg
130+
rereg_sr = r.reregSR
131+
admin.store_unreg_data(name, unreg_action, rereg_sr)
132+
print("Registered service", name, "in category '", reg_cat_id, "' as '", reg_name, "'.")
133+
else:
134+
print("Couldn't connect to registrar at sturdy_ref:", reg_sr)
135+
except Exception as e:
136+
print("Error registering service name:", name, "using data:", reg, ". Exception:", e)
137+
117138

118-
# check for sturdy ref inputs
119-
reg_config = {}
120-
if read_from_stdin and not sys.stdin.isatty():
139+
async def register_vat_at_resolvers(con_man: ConnectionManager, resolvers: list):
140+
for res in resolvers:
121141
try:
122-
reg_config = json.loads(sys.stdin.read())
123-
# print("read from stdin:", reg_config)
142+
sr = res["sturdy_ref"]
143+
print("Trying to register vat at resolver sturdy_ref:", sr)
144+
registrar = await con_man.try_connect(sr, cast_as=persistence_capnp.HostPortResolver.Registrar)
145+
if registrar:
146+
req = registrar.register_request()
147+
req.host = con_man.restorer.host
148+
req.port = con_man.restorer.port
149+
req.base64VatId = con_man.restorer.base64_vat_id
150+
req.identityProof = con_man.restorer.signature_of_vat_id()
151+
if "alias" in res:
152+
req.alias = res["alias"]
153+
r = await req.send()
154+
#r = await registrar.register(cap=name_to_service[name], regName=service_name,
155+
# categoryId=alias)
156+
hb = r.heartbeat
157+
hb_int = r.secsHeartbeatInterval
158+
capnp.getTimer().after_delay(hb_int * 10 ** 9).then(lambda: exit(0))
159+
#admin.store_unreg_data(name, unreg_action, rereg_sr)
160+
print("Registered at resolver vat with vat_id:", con_man.restorer.base64VatIdservice_name,
161+
f"and alias {res['alias']}." if "alias" in res else ".")
162+
else:
163+
print("Couldn't connect to registrar at sturdy_ref:", sr)
124164
except Exception as e:
125-
print("service.py: Error reading from sys.stdin. Exception:", e)
165+
print("Error registering vat. Exception:", e)
166+
167+
168+
async def init_and_run_service(name_to_service,
169+
host=None, port=0,
170+
serve_bootstrap=True,
171+
restorer=None,
172+
con_man=None,
173+
name_to_service_srs=None,
174+
run_before_enter_eventloop=None,
175+
restorer_container_sr=None,
176+
registries=None,
177+
resolvers=None):
178+
port = port if port else 0
179+
180+
registries = registries if registries else []
181+
resolvers = resolvers if resolvers else []
126182

127183
if not restorer:
128184
restorer = common.Restorer()
129-
if not conn_man:
130-
conn_man = common.ConnectionManager(restorer)
185+
if not con_man:
186+
con_man = common.ConnectionManager(restorer)
131187
if not name_to_service_srs:
132188
name_to_service_srs = {}
133189

134190
if restorer and restorer_container_sr:
135-
restorer_container = await conn_man.try_connect(restorer_container_sr, cast_as=storage_capnp.Store.Container)
191+
restorer_container = await con_man.try_connect(restorer_container_sr, cast_as=storage_capnp.Store.Container)
136192
if restorer_container:
137193
restorer.storage_container = restorer_container
138194
await restorer.init_vat_id_from_container()
139195
if not port:
140196
await restorer.init_port_from_container()
141197
port = restorer.port
142198

143-
# create and register admin with services
199+
# create and register admin interface with services
144200
admin = Admin(list(name_to_service.values()))
145201
for s in name_to_service.values():
146202
if isinstance(s, AdministrableService):
147203
s.admin = admin
148204
if "admin" not in name_to_service and admin not in name_to_service.values():
149205
name_to_service["admin"] = admin
150206

151-
async def register_services(conn_man, admin, reg_config):
152-
for name, data in reg_config.items():
153-
try:
154-
if isinstance(data, dict):
155-
reg_sr = data["reg_sr"]
156-
reg_name = data.get("reg_name", "")
157-
reg_cat_id = data.get("cat_id", "")
158-
elif isinstance(data, str):
159-
reg_sr = data
160-
reg_name = ""
161-
reg_cat_id = ""
162-
else:
163-
continue
164-
print("trying to register name:", name, "data:", data)
165-
registrar = await conn_man.try_connect(reg_sr, cast_as=reg_capnp.Registrar)
166-
if registrar and name in name_to_service:
167-
r = await registrar.register(cap=name_to_service[name], regName=reg_name,
168-
categoryId=reg_cat_id)
169-
unreg_action = r.unreg
170-
rereg_sr = r.reregSR
171-
admin.store_unreg_data(name, unreg_action, rereg_sr)
172-
print("Registered", name, "in category '", reg_cat_id, "' as '", reg_name, "'.")
173-
else:
174-
print("Couldn't connect to registrar at sturdy_ref:", reg_sr)
175-
except Exception as e:
176-
print("Error registering service name:", name, ". Exception:", e)
177-
178207
async def new_connection(stream):
179208
await capnp.TwoPartyServer(stream, bootstrap=restorer).on_disconnect()
180209

@@ -183,24 +212,25 @@ async def new_connection(stream):
183212
restorer.port = server.sockets[0].getsockname()[1]
184213

185214
for name, s in name_to_service.items():
186-
res = await restorer.save_str(s, name_to_service_srs.get(name, None))
215+
res = await restorer.save_str(cap=s, fixed_sr_token=name_to_service_srs.get(name, None))
187216
name_to_service_srs[name] = res["sturdy_ref"]
188217
print("service:", name, "sr:", res["sturdy_ref"])
189218
print("restorer_sr:", restorer.sturdy_ref_str())
190219

191-
await register_services(conn_man, admin, reg_config)
220+
#await register_services(con_man, name_to_service, admin, registries)
221+
await register_vat_at_resolvers(con_man, resolvers)
192222
if run_before_enter_eventloop:
193223
run_before_enter_eventloop()
194224
async with server:
195225
await server.serve_forever()
196226
#else:
197-
# await register_services(conn_man, admin, reg_config)
227+
# await register_services(con_man, admin, reg_config)
198228
# if run_before_enter_eventloop:
199229
# run_before_enter_eventloop()
200-
# await conn_man.manage_forever()
230+
# await con_man.manage_forever()
201231

202232

203-
def handle_default_service_args(parser, config: dict=None):
233+
def handle_default_service_args_with_dict(parser, config: dict=None):
204234
args = parser.parse_args()
205235

206236
remove_keys = []
@@ -233,13 +263,62 @@ def handle_default_service_args(parser, config: dict=None):
233263
with open(args.config_toml, "r") as f:
234264
toml_config = tk.load(f)
235265
config.update({ k:v for k, v in toml_config.items() if type(v) is not tk.items.Table})
236-
config.update(toml_config.get("defaults", {}))
266+
if "vat" in toml_config:
267+
config.update({f"vat.{k}": v for k, v in toml_config["vat"].items() if type(v) is not tk.items.Table})
237268
else:
238269
parser.error("argument config_toml: expected path to config TOML file")
239270

240271
for k in remove_keys:
241272
del config[k]
242-
return config, args
273+
return config, args, toml_config
274+
275+
def handle_default_service_args_toml(parser, template_toml_str: str=None):
276+
args = parser.parse_args()
277+
278+
if args.output_toml_config:
279+
print(template_toml_str)
280+
exit(0)
281+
elif args.write_toml_config:
282+
with open(args.write_toml_config, "w") as _:
283+
_.write(template_toml_str)
284+
exit(0)
285+
elif args.config_toml is not None:
286+
with open(args.config_toml, "r") as f:
287+
toml_config = tk.load(f)
288+
else:
289+
parser.error("argument config_toml: expected path to config TOML file")
290+
291+
return toml_config, args
292+
293+
294+
def handle_default_service_args(parser, path_to_template_config_json=None, path_to_service_py=None,
295+
relative_path_from_service_py_to_default_configs_folder="default_configs"):
296+
args = parser.parse_args()
297+
json_config = {}
298+
template_config_json = {}
299+
if not path_to_template_config_json and path_to_service_py:
300+
path_to_template_config_json = (
301+
os.path.join(os.path.dirname(path_to_service_py),
302+
relative_path_from_service_py_to_default_configs_folder,
303+
os.path.basename(path_to_service_py).replace(".py", ".json")))
304+
if path_to_template_config_json:
305+
with open(path_to_template_config_json, "r") as f:
306+
template_config_json = json.load(f)
307+
308+
if args.output_toml_config:
309+
print(json.dumps(template_config_json))
310+
exit(0)
311+
elif args.write_toml_config:
312+
with open(args.write_toml_config, "w") as _:
313+
_.write(json.dumps(template_config_json))
314+
exit(0)
315+
elif args.config_toml is not None:
316+
with open(args.config_toml, "r") as f:
317+
json_config = json.load(f)
318+
else:
319+
parser.error("argument config_toml: expected path to config JSON file")
320+
321+
return json_config, args
243322

244323

245324
def create_default_args_parser(

0 commit comments

Comments
 (0)