Skip to content

Commit ede1a07

Browse files
committed
feat: added connect to gateway option
1 parent 779b2d4 commit ede1a07

1 file changed

Lines changed: 33 additions & 0 deletions

File tree

zalfmas_common/service.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,35 @@ async def heartbeat():
220220
except Exception as e:
221221
print("Error registering vat. Exception:", e)
222222

223+
async def register_service_at_gateways(
224+
con_man: ConnectionManager, name: str, service, gateways: list, admin: Admin
225+
):
226+
for gw in gateways:
227+
try:
228+
sr = gw["sturdy_ref"]
229+
print("Trying to register vat at gateway sturdy_ref:", sr)
230+
gateway = await con_man.try_connect(
231+
sr, cast_as=persistence_capnp.Gateway
232+
)
233+
if gateway:
234+
res = await gateway.register(service)
235+
print(res)
236+
hb = res.heartbeat
237+
hb_int = res.secsHeartbeatInterval
238+
service_sr_at_gateway = res.sturdyRef
239+
240+
async def heartbeat():
241+
while True:
242+
await asyncio.sleep(hb_int)
243+
#print("beat", datetime.now())
244+
await hb.beat()
245+
246+
admin.tasks.append(asyncio.create_task(heartbeat()))
247+
print(f"service: {name}, sr@'{gw['name']}': {common.sturdy_ref_str_from_sr(service_sr_at_gateway)}")
248+
else:
249+
print("Couldn't connect to gateway at sturdy_ref:", sr)
250+
except Exception as e:
251+
print("Error registering service at gateway. Exception:", e)
223252

224253
async def init_and_run_service_from_config(
225254
config: dict,
@@ -238,6 +267,7 @@ async def init_and_run_service_from_config(
238267
serve_bootstrap=cv.get("serve_bootstrap", True),
239268
registries=cs.get("registries", None),
240269
resolvers=cv.get("resolvers", None),
270+
gateways=cs.get("gateways", None),
241271
con_man=con_man,
242272
restorer=restorer,
243273
restorer_container_sr=cv.get("restorer_container_sr", None),
@@ -256,9 +286,11 @@ async def init_and_run_service(
256286
restorer_container_sr: str = None,
257287
registries: dict = None,
258288
resolvers: dict = None,
289+
gateways: dict = None,
259290
):
260291
registries = registries if registries else []
261292
resolvers = resolvers if resolvers else []
293+
gateways = gateways if gateways else []
262294

263295
if not restorer:
264296
restorer = common.Restorer()
@@ -295,6 +327,7 @@ async def new_connection(stream):
295327
restorer.port = server.sockets[0].getsockname()[1]
296328

297329
for name, s in name_to_service.items():
330+
await register_service_at_gateways(con_man, name, s, gateways, admin)
298331
res = await restorer.save_str(
299332
cap=s, fixed_sr_token=name_to_service_srs.get(name, None)
300333
)

0 commit comments

Comments
 (0)