Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix non-windows support for run_robot.py #71

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions run_robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,120 +21,139 @@ def ensure_directory_exists(files, directory):
# Ignore the error if the directory already exists
pass


def copy_file_to_pico(files, local_file_path, pico_destination_path):
"""Copy a single file to the Pico, replacing it if it already exists."""
try:
with open(local_file_path, 'rb') as local_file:
with open(local_file_path, "rb") as local_file:
file_content = local_file.read()

# Write or replace the file on the Pico
files.put(pico_destination_path, file_content)
print(f"Successfully copied {local_file_path} to {pico_destination_path} on Pico.")

print(
f"Successfully copied {local_file_path} to {pico_destination_path} on Pico."
)

except Exception as e:
print(f"Failed to copy file {local_file_path}. Error: {str(e)}")


def copy_directory_to_pico(files, local_directory, pico_destination_directory):
"""Copy an entire directory to the Pico, including all subdirectories and files"""
try:
for root, dirs, files_list in os.walk(local_directory):
relative_path = os.path.relpath(root, local_directory)
pico_dir_path = os.path.join(pico_destination_directory, relative_path).replace("\\", "/")
pico_dir_path = os.path.join(
pico_destination_directory, relative_path
).replace("\\", "/")
ensure_directory_exists(files, pico_dir_path)

for file_name in files_list:
local_file_path = os.path.join(root, file_name)
pico_file_path = os.path.join(pico_dir_path, file_name).replace("\\", "/")
pico_file_path = os.path.join(pico_dir_path, file_name).replace(
"\\", "/"
)
copy_file_to_pico(files, local_file_path, pico_file_path)

except Exception as e:
print(f"Failed to copy directory {local_directory}. Error: {str(e)}")
finally:
pyb.close() # Ensure the connection to the Pico is closed


def create_temp_folder():
"""Create a temporary folder to store the last synced XRPLib files."""
if not os.path.exists(TEMP_FOLDER):
os.makedirs(TEMP_FOLDER)


def compare_and_copy(files, local_directory, pico_destination_directory):
"""Compare local XRPLib with the last synced version and copy only changed files."""
create_temp_folder()
for root, dirs, files_list in os.walk(local_directory):
relative_path = os.path.relpath(root, local_directory)
pico_dir_path = os.path.join(pico_destination_directory, relative_path).replace("\\", "/")
pico_dir_path = os.path.join(pico_destination_directory, relative_path).replace(
"\\", "/"
)
temp_dir_path = os.path.join(TEMP_FOLDER, relative_path)

if not os.path.exists(temp_dir_path):
os.makedirs(temp_dir_path)

for file_name in files_list:
local_file_path = os.path.join(root, file_name)
temp_file_path = os.path.join(temp_dir_path, file_name)
pico_file_path = os.path.join(pico_dir_path, file_name).replace("\\", "/")

if not os.path.exists(temp_file_path) or not filecmp.cmp(local_file_path, temp_file_path, shallow=False):
if not os.path.exists(temp_file_path) or not filecmp.cmp(
local_file_path, temp_file_path, shallow=False
):
# If the file doesn't exist in the temp folder, or it has changed, copy it
copy_file_to_pico(files, local_file_path, pico_file_path)
# Update the temp folder with the new file
shutil.copy2(local_file_path, temp_file_path)


def read_serial_output(port, baudrate=115200, timeout=1):
"""Read and print serial output from the Pico W."""
try:
with serial.Serial(port, baudrate, timeout=timeout) as ser:
print("Reading serial output...")
while True:
if ser.in_waiting > 0:
output = ser.read(ser.in_waiting).decode('utf-8')
print(output, end='') # Print without adding extra newlines
output = ser.read(ser.in_waiting).decode("utf-8")
print(output, end="") # Print without adding extra newlines
time.sleep(0.1) # Small delay to prevent high CPU usage
except Exception as e:
print(f"Failed to read serial output. Error: {str(e)}")


def run_pico_script(port, script_name):
"""Run a MicroPython script on the Pico using ampy."""
try:
subprocess.run(['ampy', '-p', port, 'run', script_name], check=True)
subprocess.run(["ampy", "-p", port, "run", script_name], check=True)
print(f"Successfully ran {script_name} on Pico.")
except subprocess.CalledProcessError as e:
print(f"Failed to run script {script_name}. Error: {str(e)}")


def list_files_on_pico(port):
"""List all files and folders on the Pico."""
try:
pyb = Pyboard(port)
files = Files(pyb)
file_list = files.ls('/')
file_list = files.ls("/")

print("Files and folders on Pico:")
for file in file_list:
print(file)

except Exception as e:
print(f"Failed to list files. Error: {str(e)}")


def copy_all_files_to_pico(port, directory=True, main=True, telemetry=True):
pyb = Pyboard(port)
files = Files(pyb)

if directory:
compare_and_copy(files, "XRPLib", "lib/XRPLib")
if main:
copy_file_to_pico(files, "mainprogram.py", "mainprogram.py")
if telemetry:
copy_file_to_pico(files, "telemetry.txt", "telemetry.txt")


def detect_pico_port():
"""Auto-detect the Pico W's serial port."""
"""Auto-detect the Pico W's serial port based on vendor and product IDs."""
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "usbmodem" in port.device or "COM" in port.device: # Adjust this pattern if needed
if "VID:PID=2E8A:0005" in port.hwid: # Look for the MicroPython Board ID
return port.device
raise IOError("Pico W not found. Please check the connection.")

if __name__ == "__main__":

if __name__ == "__main__":
# Auto-detect the Pico W port
pico_port = detect_pico_port()
print(f"Pico W detected on port: {pico_port}")
Expand Down