-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpacket.py
More file actions
64 lines (52 loc) · 1.84 KB
/
packet.py
File metadata and controls
64 lines (52 loc) · 1.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import json
import time
from dataclasses import dataclass
from typing import Optional, Any, Dict
@dataclass
class Packet:
ts: float
seq: int = 0
override: int = 0
joyx: int = 0
joyy: int = 0
calm: float = 0.0
engage: float = 0.0
trig: int = 0
emotion: str = "neutral"
def _clamp_int(v: int, lo: int, hi: int) -> int:
return max(lo, min(hi, int(v)))
def _clamp_float(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, float(v)))
def parse_packet(raw: bytes) -> Optional[Packet]:
try:
s = raw.decode("utf-8", errors="ignore").strip()
if not s:
return None
ts = time.time()
if s.startswith("{"):
obj: Dict[str, Any] = json.loads(s)
return Packet(
ts=ts,
seq=_clamp_int(obj.get("seq", 0), 0, 65535),
override=_clamp_int(obj.get("override", 0), 0, 1),
joyx=_clamp_int(obj.get("joyx", obj.get("joy_x", 0)), -100, 100),
joyy=_clamp_int(obj.get("joyy", obj.get("joy_y", 0)), -100, 100),
calm=_clamp_float(obj.get("calm", 0.0), 0.0, 1.0),
engage=_clamp_float(obj.get("engage", 0.0), 0.0, 1.0),
trig=_clamp_int(obj.get("trig", obj.get("trigger", 0)), 0, 1),
emotion=str(obj.get("emotion", "neutral")),
)
parts = [p.strip() for p in s.split(",")]
if len(parts) < 6:
return None
return Packet(
ts=ts,
seq=_clamp_int(int(parts[0]), 0, 65535),
override=_clamp_int(int(parts[1]), 0, 1),
joyx=_clamp_int(int(parts[2]), -100, 100),
joyy=_clamp_int(int(parts[3]), -100, 100),
emotion=str(parts[4]),
trig=_clamp_int(int(parts[5]), 0, 1),
)
except Exception:
return None