-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 790d0b7
Showing
9 changed files
with
2,109 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
/__pycache__ | ||
/DONOTUSE | ||
*.mp3 | ||
.env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Flask Christmas Lights | ||
|
||
Using a Flask server to control Raspberry Pi GPIO pins connected to a power relay | ||
|
||
[Demo Video](https://youtu.be/JECr_2w_JJ8) | ||
|
||
[Medium Post](https://medium.com/@johndavidsimmons) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from flask import Flask, render_template | ||
from raspberry import RaspberryThread | ||
from light_functions import blink_all, all_pins_off, lightshow, cycle_all | ||
from xmas import russian_xmas | ||
import os | ||
|
||
# Load the env variables | ||
if os.path.exists('.env'): | ||
print('Importing environment from .env...') | ||
for line in open('.env'): | ||
var = line.strip().split('=') | ||
if len(var) == 2: | ||
os.environ[var[0]] = var[1] | ||
|
||
|
||
app = Flask(__name__) | ||
|
||
|
||
@app.route("/") | ||
def index(): | ||
return render_template("index.html") | ||
|
||
|
||
@app.route("/blink", methods=['GET']) | ||
def blink_view(): | ||
# Pause any running threads | ||
any(thread.pause() for thread in threads) | ||
|
||
# Start the target thread if it is not running | ||
if not blink_thread.isAlive(): | ||
blink_thread.start() | ||
# Unpause the thread and thus execute its function | ||
blink_thread.resume() | ||
return "blink started" | ||
|
||
|
||
@app.route("/cycleall", methods=["GET"]) | ||
def cycleall_view(): | ||
any(thread.pause() for thread in threads) | ||
if not cycle_all_thread.isAlive(): | ||
cycle_all_thread.start() | ||
cycle_all_thread.resume() | ||
return "cycle all started" | ||
|
||
|
||
@app.route("/lightshow", methods=["GET"]) | ||
def lightshow_view(): | ||
any(thread.pause() for thread in threads) | ||
if not lightshow_thread.isAlive(): | ||
lightshow_thread.start() | ||
lightshow_thread.resume() | ||
return "lightshow started" | ||
|
||
|
||
@app.route("/russianxmas", methods=["GET"]) | ||
def russianxmas_view(): | ||
any(thread.pause() for thread in threads) | ||
if not russian_xmas_thread.isAlive(): | ||
russian_xmas_thread.start() | ||
russian_xmas_thread.resume() | ||
return "russian xmas started" | ||
|
||
|
||
@app.route("/shutdown", methods=['GET']) | ||
def shutdown(): | ||
all_pins_off() | ||
any(thread.pause() for thread in threads) | ||
return "all threads paused" | ||
|
||
|
||
if __name__ == '__main__': | ||
# Create threads | ||
blink_thread = RaspberryThread(function=blink_all) | ||
lightshow_thread = RaspberryThread(function=lightshow) | ||
russian_xmas_thread = RaspberryThread(function=russian_xmas) | ||
cycle_all_thread = RaspberryThread(function=cycle_all) | ||
|
||
# collect threads | ||
threads = [ | ||
blink_thread, | ||
lightshow_thread, | ||
russian_xmas_thread, | ||
cycle_all_thread | ||
] | ||
|
||
# Run server | ||
app.run( | ||
debug=True, | ||
host=os.environ.get("IP_ADDRESS"), | ||
port=int(os.environ.get("PORT")), | ||
threaded=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import RPi.GPIO as GPIO | ||
import time | ||
import random | ||
|
||
# Pins to use | ||
# 5, 6, 13, 19, 26, 16 | ||
|
||
################# | ||
# GPIO SETUP # | ||
################# | ||
|
||
# Turn off warnings | ||
GPIO.setwarnings(False) | ||
|
||
# Set pin mapping to board, use GPIO numbers not pin numbers | ||
GPIO.setmode(GPIO.BCM) | ||
|
||
|
||
class Pin(object): | ||
def __init__(self, pin_number, jumper_color, relay_number): | ||
self.pin_number = pin_number | ||
self.jumper_color = jumper_color | ||
self.relay_number = relay_number | ||
|
||
pin1 = Pin(pin_number=5, jumper_color="green", relay_number=1) | ||
pin2 = Pin(pin_number=6, jumper_color="orange", relay_number=2) | ||
pin3 = Pin(pin_number=13, jumper_color="purple", relay_number=3) | ||
pin4 = Pin(pin_number=19, jumper_color="blue", relay_number=4) | ||
pin5 = Pin(pin_number=26, jumper_color="white", relay_number=5) | ||
pin6 = Pin(pin_number=16, jumper_color="brown", relay_number=6) | ||
|
||
pins = [ | ||
pin1, pin2, pin3, | ||
pin4, pin5, pin6, | ||
] | ||
|
||
pin_numbers = [pin.pin_number for pin in pins] | ||
relay_numbers = [pin.relay_number for pin in pins] | ||
relay_pin_map = dict(zip(relay_numbers, pin_numbers)) | ||
|
||
# GPIO.LOW = relay on, GPIO.HIGH = relay off | ||
on = lambda pin: GPIO.output(pin, GPIO.LOW) | ||
off = lambda pin: GPIO.output(pin, GPIO.HIGH) | ||
|
||
# "any" function executes the function on the iterator but doesn't return anything | ||
# This is the same as "for pin in pins: GPIO.setup" | ||
for pin in pin_numbers: | ||
GPIO.setup(pin, GPIO.OUT) | ||
|
||
###################### | ||
# GPIO FUNCTIONS # | ||
###################### | ||
|
||
|
||
def blink(*, pin_numbers: list, iterations=1, sleep=0.5) -> None: | ||
"""Turn all pins on, sleep, turn all pins off""" | ||
while iterations > 0: | ||
any(on(pin_number) for pin_number in pin_numbers) | ||
time.sleep(sleep) | ||
any(off(pin_number) for pin_number in pin_numbers) | ||
time.sleep(sleep) | ||
iterations -= 1 | ||
|
||
|
||
def step(*, pin_numbers: list, iterations=1, sleep=0.5) -> None: | ||
"""Turn a pin on then off then move onto the next pin""" | ||
while iterations > 0: | ||
for pin in pin_numbers: | ||
on(pin) | ||
time.sleep(sleep) | ||
off(pin) | ||
iterations -= 1 | ||
|
||
|
||
def climb(*, pin_numbers: list, iterations=1, sleep=0.5) -> None: | ||
"""Turn the pins on in order then off in reverse""" | ||
climb_number = len(pin_numbers) | ||
reversed_pin_numbers = reversed(pin_numbers) | ||
|
||
while iterations > 0: | ||
for index, pin in enumerate(pin_numbers): | ||
if index <= climb_number: | ||
on(pin) | ||
time.sleep(sleep) | ||
for pin in reversed_pin_numbers: | ||
off(pin) | ||
time.sleep(sleep) | ||
iterations -= 1 | ||
|
||
|
||
def lightshow(): | ||
""" | ||
Choose a random light function | ||
Execute it for a random number of iterations between 1-5 | ||
Sleep for a random time during function between .1 and .5 seconds | ||
""" | ||
all_pins = pin_numbers | ||
even_pins = pin_numbers[0:][::2] | ||
odd_pins = pin_numbers[1:][::2] | ||
random_pin = [random.choice(pin_numbers)] | ||
first_half_pins = pin_numbers[:int(len(pin_numbers) / 2)] | ||
second_half_pins = pin_numbers[int(len(pin_numbers) / 2):] | ||
random_sleep = float(str(random.uniform(.1, .5))[:4]) | ||
random_iterations = random.choice(list(range(5))) | ||
|
||
functions = [blink, step, climb] | ||
pin_configs = [ | ||
all_pins, even_pins, odd_pins, | ||
first_half_pins, second_half_pins, random_pin | ||
] | ||
|
||
random.choice(functions)( | ||
pin_numbers=random.choice(pin_configs), | ||
iterations=random_iterations, | ||
sleep=random_sleep) | ||
|
||
|
||
def blink_all(): | ||
"""Turn all pins on, sleep, turn all pins off""" | ||
any(on(pin) for pin in pin_numbers) | ||
time.sleep(0.5) | ||
any(off(pin) for pin in pin_numbers) | ||
time.sleep(0.5) | ||
|
||
def cycle_all(): | ||
"""Turn all pins on in order, sleep, turn all pins off in reverse order""" | ||
climb(pin_numbers=pin_numbers) | ||
|
||
def all_pins_off(): | ||
"""Turn off all pins""" | ||
any(off(pin) for pin in pin_numbers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import threading | ||
|
||
class RaspberryThread(threading.Thread): | ||
def __init__(self, function): | ||
self.paused = True | ||
self.state = threading.Condition() | ||
self.function = function | ||
super(RaspberryThread, self).__init__() | ||
|
||
def start(self): | ||
super(RaspberryThread, self).start() | ||
|
||
def run(self): | ||
# self.resume() #unpause self | ||
while True: | ||
with self.state: | ||
if self.paused: | ||
self.state.wait() #block until notifed | ||
while not self.paused: | ||
# Call function | ||
self.function() | ||
|
||
def resume(self): | ||
with self.state: | ||
self.paused = False | ||
self.state.notify() | ||
|
||
def pause(self): | ||
with self.state: | ||
self.paused = True |
Oops, something went wrong.