-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
148 lines (132 loc) · 5.65 KB
/
main.py
File metadata and controls
148 lines (132 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
import time
import json
from camera import Camera
from openai_client import send_image_and_get_instructions, parse_response
from audio import SpeechToText, TextToSpeech
from motor_control import init_gpio, cleanup_gpio, MotorDriver, mecanum_drive
def parse_command(command):
if not command:
return ("waiting", None)
command_lower = command.lower()
if "look for" in command_lower:
target = command_lower.split("look for", 1)[1].strip()
return ("search", target)
elif "find" in command_lower:
target = command_lower.split("find", 1)[1].strip()
return ("search", target)
elif ("what" in command_lower or "see" in command_lower or "describe" in command_lower) and ("front" in command_lower or "ahead" in command_lower):
return ("see", None)
else:
return ("waiting", None)
def search_for_target(target, camera, tts, simulate, lf, lr, rf, rr):
# Append "book" if not already there
if "book" not in target:
target += " book"
tts.speak(f"Searching for {target}.")
attempts = 0
max_attempts = 36 # roughly one full rotation (360°)
rotation_speed = 40
rotation_duration = 0.3
while attempts < max_attempts:
if attempts > 0:
tts.speak("Stabilizing position.")
time.sleep(5)
tts.speak("Capturing image.")
b64_img = camera.capture_frame()
prompt = (
f"Analyze the image and determine if the book '{target}' is visible. "
"If it is, respond with a JSON object: {\"found\": true, \"description\": <brief description>}. "
"If it is not visible, respond with {\"found\": false}."
)
tts.speak("Analyzing.")
response_text = send_image_and_get_instructions(b64_img, prompt, simulate=simulate)
print("\n--- RAW API RESPONSE ---")
print(response_text)
# If no valid response, say "Thinking" and retry after waiting.
if not response_text:
tts.speak("Thinking.")
time.sleep(5)
continue
try:
data = json.loads(response_text)
found = data.get("found", False)
except Exception as e:
print("JSON parsing error:", e)
try:
if target.lower() in response_text.lower():
found = True
else:
found = False
except Exception as inner_e:
print("Inner error:", inner_e)
found = False
if found:
description = data.get("description", "it looks interesting") if isinstance(data, dict) else "it looks interesting"
tts.speak(f"Found {target}: {description}.")
return True
else:
tts.speak("Not found here. Waiting 5 seconds.")
time.sleep(5)
tts.speak("Rotating slowly.")
# Rotate left at defined speed and duration.
mecanum_drive(lf, lr, rf, rr, "rotate_left", rotation_speed)
time.sleep(rotation_duration)
mecanum_drive(lf, lr, rf, rr, "stop", 0)
attempts += 1
tts.speak(f"Could not locate {target} after a full rotation.")
return False
def main():
# Initialize GPIO and motors.
init_gpio()
lf = MotorDriver(5, 6, 12) # Left Front Motor
lr = MotorDriver(27, 22, 18) # Left Rear Motor
rf = MotorDriver(24, 23, 19) # Right Front Motor
rr = MotorDriver(26, 17, 13) # Right Rear Motor
# Use a lower resolution to reduce token load.
camera = Camera(capture_resolution=(640, 480), output_resolution=(320, 240))
stt = SpeechToText()
tts = TextToSpeech()
SIMULATE_GPT = False # Make sure your API key is properly set in your environment
detailed_prompt = (
"Analyze the provided camera image and give a short description of what you see. "
"If there is a book or notebook visible, include a brief description of its cover. "
"Then answer: What do you see?"
)
tts.speak("Voice Interactive Visual Assistant started. Please say a command.")
print("I am waiting for instructions.")
try:
while True:
command_text = stt.listen_command()
print("Recognized command:", command_text)
mode, target = parse_command(command_text)
if mode == "search" and target:
if search_for_target(target, camera, tts, SIMULATE_GPT, lf, lr, rf, rr):
tts.speak("Consider it done!")
tts.speak("Moving forward at normal speed for one and a half seconds.")
forward_speed = 60
forward_duration = 1.5
mecanum_drive(lf, lr, rf, rr, "forward", forward_speed)
time.sleep(forward_duration)
mecanum_drive(lf, lr, rf, rr, "stop", 0)
else:
tts.speak("I couldn't find the target. Please try again.")
elif mode == "see":
b64_img = camera.capture_frame()
response_text = send_image_and_get_instructions(b64_img, detailed_prompt, simulate=SIMULATE_GPT)
if response_text:
final_answer = parse_response(response_text)
tts.speak(final_answer)
else:
tts.speak("Sorry, I didn't get a response.")
else:
tts.speak("I am waiting for instructions.")
time.sleep(1)
except KeyboardInterrupt:
tts.speak("Exiting. Goodbye!")
print("Exiting due to keyboard interrupt.")
finally:
camera.close()
cleanup_gpio()
if __name__ == "__main__":
main()