diff --git a/BridgeEmulator/HueEmulator3.py b/BridgeEmulator/HueEmulator3.py index 55f173920..4aff53782 100755 --- a/BridgeEmulator/HueEmulator3.py +++ b/BridgeEmulator/HueEmulator3.py @@ -1948,7 +1948,7 @@ def run(https, server_class=ThreadingSimpleServer, handler_class=S): Thread(target=ssdpBroadcast, args=[HOST_IP, HOST_HTTP_PORT, mac]).start() Thread(target=schedulerProcessor).start() Thread(target=syncWithLights, args=[bridge_config["lights"], bridge_config["lights_address"], bridge_config["config"]["whitelist"], bridge_config["groups"], off_if_unreachable]).start() - Thread(target=entertainmentService, args=[bridge_config["lights"], bridge_config["lights_address"], bridge_config["groups"]]).start() + Thread(target=entertainmentService, args=[bridge_config["lights"], bridge_config["lights_address"], bridge_config["groups"], HOST_IP]).start() Thread(target=run, args=[False]).start() if not args.no_serve_https: Thread(target=run, args=[True]).start() diff --git a/BridgeEmulator/functions/colors.py b/BridgeEmulator/functions/colors.py index 4889910f6..258e53ead 100644 --- a/BridgeEmulator/functions/colors.py +++ b/BridgeEmulator/functions/colors.py @@ -1,4 +1,16 @@ -def convert_rgb_xy(red,green,blue): +def rgbBrightness(rgb, brightness): + r = sorted((0, int(rgb[0] * brightness) >> 8, 255))[1] #calculate with brightness and clamp + g = sorted((0, int(rgb[1] * brightness) >> 8, 255))[1] + b = sorted((0, int(rgb[2] * brightness) >> 8, 255))[1] + return [r, g, b] + +def clampRGB(rgb): + r = sorted((0, int(rgb[0]), 255))[1] + g = sorted((0, int(rgb[1]), 255))[1] + b = sorted((0, int(rgb[2]), 255))[1] + return [r, g, b] + +def convert_rgb_xy(red, green, blue): red = pow((red + 0.055) / (1.0 + 0.055), 2.4) if red > 0.04045 else red / 12.92 green = pow((green + 0.055) / (1.0 + 0.055), 2.4) if green > 0.04045 else green / 12.92 blue = pow((blue + 0.055) / (1.0 + 0.055), 2.4) if blue > 0.04045 else blue / 12.92 @@ -51,7 +63,7 @@ def convert_xy(x, y, bri): #needed for milight hub that don't work with xy value r = 0 if r < 0 else r g = 0 if g < 0 else g b = 0 if b < 0 else b - return [int(r * bri), int(g * bri), int(b * bri)] + return clampRGB([r * bri, g * bri, b * bri]) def hsv_to_rgb(h, s, v): s = float(s / 254) @@ -84,5 +96,4 @@ def hsv_to_rgb(h, s, v): g = 0 b = x - r, g, b = int(r * 255), int(g * 255), int(b * 255) - return r, g, b + return clampRGB([r * 255, g * 255, b * 255]) diff --git a/BridgeEmulator/functions/entertainment.py b/BridgeEmulator/functions/entertainment.py index fe59a4a8c..dbe2f2217 100644 --- a/BridgeEmulator/functions/entertainment.py +++ b/BridgeEmulator/functions/entertainment.py @@ -3,11 +3,11 @@ from functions.colors import convert_rgb_xy, convert_xy from functions.lightRequest import sendLightRequest -def entertainmentService(lights, addresses, groups): +def entertainmentService(lights, addresses, groups, host_ip): serverSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) serverSocket.settimeout(3) #Set a packet timeout that we catch later serverSocket.bind(('127.0.0.1', 2101)) - fremeID = 0 + frameID = 0 lightStatus = {} syncing = False #Flag to check whether or not we had been syncing when a timeout occurs while True: @@ -26,38 +26,45 @@ def entertainmentService(lights, addresses, groups): r = int((data[i+3] * 256 + data[i+4]) / 256) g = int((data[i+5] * 256 + data[i+6]) / 256) b = int((data[i+7] * 256 + data[i+8]) / 256) + proto = addresses[str(lightId)]["protocol"] if lightId not in lightStatus: lightStatus[lightId] = {"on": False, "bri": 1} if r == 0 and g == 0 and b == 0: lights[str(lightId)]["state"]["on"] = False else: lights[str(lightId)]["state"].update({"on": True, "bri": int((r + g + b) / 3), "xy": convert_rgb_xy(r, g, b), "colormode": "xy"}) - if addresses[str(lightId)]["protocol"] in ["native", "native_multi", "native_single"]: + if proto in ["native", "native_multi", "native_single"]: if addresses[str(lightId)]["ip"] not in nativeLights: nativeLights[addresses[str(lightId)]["ip"]] = {} nativeLights[addresses[str(lightId)]["ip"]][addresses[str(lightId)]["light_nr"] - 1] = [r, g, b] - if addresses[str(lightId)]["protocol"] == "esphome": + if proto == "esphome": if addresses[str(lightId)]["ip"] not in esphomeLights: esphomeLights[addresses[str(lightId)]["ip"]] = {} bri = int(max(r,g,b)) esphomeLights[addresses[str(lightId)]["ip"]]["color"] = [r, g, b, bri] else: - if fremeID == 24: # => every seconds, increase in case the destination device is overloaded - if r == 0 and g == 0 and b == 0: + if frameID == 24: # => every seconds, increase in case the destination device is overloaded + gottaSend = False + yee = proto == "yeelight" + brABS = abs(int((r + b + g) / 3) - lightStatus[lightId]["bri"]) + if r == 0 and g == 0 and b == 0: #Turn off if color is black if lightStatus[lightId]["on"]: - sendLightRequest(str(lightId), {"on": False, "transitiontime": 3}, lights, addresses) + sendLightRequest(str(lightId), {"on": False, "transitiontime": 3}, lights, addresses, None, host_ip) lightStatus[lightId]["on"] = False - elif lightStatus[lightId]["on"] == False: - sendLightRequest(str(lightId), {"on": True, "transitiontime": 3}, lights, addresses) - lightStatus[lightId]["on"] = True - elif abs(int((r + b + g) / 3) - lightStatus[lightId]["bri"]) > 50: # to optimize, send brightness only of difference is bigger than this value - sendLightRequest(str(lightId), {"bri": int((r + b + g) / 3), "transitiontime": 3}, lights, addresses) - lightStatus[lightId]["bri"] = int((r + b + g) / 3) else: - sendLightRequest(str(lightId), {"xy": convert_rgb_xy(r, g, b), "transitiontime": 3}, lights, addresses) - fremeID += 1 - if fremeID == 25: - fremeID = 0 + if lightStatus[lightId]["on"] == False: #Turn on if color is not black + sendLightRequest(str(lightId), {"on": True, "transitiontime": 3}, lights, addresses, None, host_ip) + lightStatus[lightId]["on"] = True + elif brABS > 50: # to optimize, send brightness only if difference is bigger than this value + sendLightRequest(str(lightId), {"bri": int((r + b + g) / 3), "transitiontime": 150 / brABS}, lights, addresses, None, host_ip) + lightStatus[lightId]["bri"] = int((r + b + g) / 3) + else: + gottaSend = True + if gottaSend or yee: + sendLightRequest(str(lightId), {"xy": convert_rgb_xy(r, g, b), "transitiontime": 3}, lights, addresses, [r, g, b], host_ip) + frameID += 1 + if frameID == 25: + frameID = 0 i = i + 9 elif data[14] == 1: #cie colorspace i = 16 @@ -79,16 +86,13 @@ def entertainmentService(lights, addresses, groups): if addresses[str(lightId)]["protocol"] == "esphome": if addresses[str(lightId)]["ip"] not in esphomeLights: esphomeLights[addresses[str(lightId)]["ip"]] = {} - color = convert_xy(x, y, bri) - r = int(color[0]) - g = int(color[1]) - b = int(color[2]) + r, g, b = convert_xy(x, y, bri) esphomeLights[addresses[str(lightId)]["ip"]]["color"] = [r, g, b, bri] else: - fremeID += 1 - if fremeID == 24 : #24 = every seconds, increase in case the destination device is overloaded - sendLightRequest(str(lightId), {"xy": [x,y]}, lights, addresses) - fremeID = 0 + frameID += 1 + if frameID == 24 : #24 = every seconds, increase in case the destination device is overloaded + sendLightRequest(str(lightId), {"xy": [x,y]}, lights, addresses, None, host_ip) + frameID = 0 i = i + 9 if len(nativeLights) is not 0: for ip in nativeLights.keys(): diff --git a/BridgeEmulator/functions/lightRequest.py b/BridgeEmulator/functions/lightRequest.py index 174fb8702..0f668137c 100755 --- a/BridgeEmulator/functions/lightRequest.py +++ b/BridgeEmulator/functions/lightRequest.py @@ -1,20 +1,25 @@ import logging, json from functions.request import sendRequest -from functions.colors import convert_rgb_xy, convert_xy +from functions.colors import convert_rgb_xy, convert_xy, rgbBrightness from subprocess import check_output from protocols import protocols -from datetime import datetime +from datetime import datetime, timedelta from time import sleep from functions.updateGroup import updateGroupStats -def sendLightRequest(light, data, lights, addresses): +def sendLightRequest(light, data, lights, addresses, rgb = None, entertainmentHostIP = None): payload = {} if light in addresses: protocol_name = addresses[light]["protocol"] for protocol in protocols: if "protocols." + protocol_name == protocol.__name__: try: - light_state = protocol.set_light(addresses[light], lights[light], data) + if entertainmentHostIP and protocol_name == "yeelight": + protocol.enableMusic(addresses[light]["ip"], entertainmentHostIP) + if protocol_name in ["yeelight", "mi_box", "esphome", "tasmota"]: + protocol.set_light(addresses[light], lights[light], data, rgb) + else: + protocol.set_light(addresses[light], lights[light], data) except Exception as e: lights[light]["state"]["reachable"] = False logging.warning(lights[light]["name"] + " light not reachable: %s", e) @@ -70,7 +75,10 @@ def sendLightRequest(light, data, lights, addresses): color_data["t"] = ct255 elif colormode == "xy": color_data["m"] = 3 - (color_data["r"], color_data["g"], color_data["b"]) = convert_xy(xy[0], xy[1], 255) + if rgb: + (color_data["r"], color_data["g"], color_data["b"]) = rgbBrightness(rgb, bri) + else: + (color_data["r"], color_data["g"], color_data["b"]) = convert_xy(xy[0], xy[1], bri) url += "&color="+json.dumps(color_data) url += "&brightness=" + str(round(float(bri)/255*100)) @@ -105,7 +113,10 @@ def sendLightRequest(light, data, lights, addresses): payload["saturation"] = value * 100 / 255 elif key == "xy": payload["color"] = {} - (payload["color"]["r"], payload["color"]["g"], payload["color"]["b"]) = convert_xy(value[0], value[1], lights[light]["state"]["bri"]) + if rgb: + payload["color"]["r"], payload["color"]["g"], payload["color"]["b"] = rgbBrightness(rgb, lights[light]["state"]["bri"]) + else: + payload["color"]["r"], payload["color"]["g"], payload["color"]["b"] = convert_xy(value[0], value[1], lights[light]["state"]["bri"]) logging.info(json.dumps(payload)) elif addresses[light]["protocol"] == "ikea_tradfri": #IKEA Tradfri bulb @@ -166,8 +177,11 @@ def sendLightRequest(light, data, lights, addresses): logging.info(pretty_json(data)) bri = data["bri"] if "bri" in data else lights[light]["state"]["bri"] xy = data["xy"] if "xy" in data else lights[light]["state"]["xy"] - rgb = convert_xy(xy[0], xy[1], bri) - msg = bytearray([0x41, rgb[0], rgb[1], rgb[2], 0x00, 0xf0, 0x0f]) + if rgb: + color = rgbBrightness(rgb, bri) + else: + color = convert_xy(xy[0], xy[1], bri) + msg = bytearray([0x41, color[0], color[1], color[2], 0x00, 0xf0, 0x0f]) checksum = sum(msg) & 0xFF msg.append(checksum) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP @@ -301,8 +315,13 @@ def syncWithLights(lights, addresses, users, groups, off_if_unreachable): #updat i = 0 while i < 300: #sync with lights every 300 seconds or instant if one user is connected for user in users.keys(): - if users[user]["last use date"] == datetime.now().strftime("%Y-%m-%dT%H:%M:%S"): - i = 300 - break + lu = users[user]["last use date"] + try: #in case if last use is not a proper datetime + lu = datetime.strptime(lu, "%Y-%m-%dT%H:%M:%S") + if abs(datetime.now() - lu) <= timedelta(seconds = 2): + i = 300 + break + except: + pass i += 1 sleep(1) diff --git a/BridgeEmulator/functions/network_OpenWrt.py b/BridgeEmulator/functions/network_OpenWrt.py index 7310953a9..749280a85 100644 --- a/BridgeEmulator/functions/network_OpenWrt.py +++ b/BridgeEmulator/functions/network_OpenWrt.py @@ -11,9 +11,16 @@ def get_interface_ip(ifname): bytes(ifname[:15], 'utf-8')))[20:24]) def getIpAddress(): - ip = socket.gethostbyname(socket.gethostname()) - if ip.startswith("127.") and os.name != "nt": + ip = None + + try: + ip = socket.gethostbyname(socket.gethostname()) + except: + pass + + if (not ip or ip.startswith("127.")) and os.name != "nt": interfaces = [ + "br0", "br-lan", "eth0", "eth1", diff --git a/BridgeEmulator/protocols/esphome.py b/BridgeEmulator/protocols/esphome.py index eb6eb9b60..cd3b14a88 100644 --- a/BridgeEmulator/protocols/esphome.py +++ b/BridgeEmulator/protocols/esphome.py @@ -1,293 +1,296 @@ -import json -import logging -import random -import requests - -import socket -import sys - -from time import sleep -from subprocess import check_output -from functions import light_types, nextFreeId -from functions.colors import convert_rgb_xy, convert_xy, hsv_to_rgb -from functions.network import getIpAddress - -def getRequest(address, request_data, timeout=3): - - head = {"Content-type": "application/json"} - response = requests.get("http://" + address + request_data, timeout=timeout, headers=head) - return response.text - -def postRequest(address, request_data, timeout=3): - head = {"Content-type": "application/json"} - response = requests.post("http://" + address + request_data, timeout=3, headers=head) - return response.text - -def addRequest(request_data, data_type, new_data): - if ("?" in request_data): - request_data = request_data + "&" + str(data_type) + "=" + str(new_data) - else: - request_data = request_data + "?" + str(data_type) + "=" + str(new_data) - return request_data - -def getLightType(light, address, data): - request_data = "" - if address["esphome_model"] == "ESPHome-RGBW": - if "xy" in data: #revised according to hue api docs - request_data = request_data + "/light/color_led" - elif "ct" in data: - request_data = request_data + "/light/white_led" - elif ("hue" in data) or ("sat" in data): - request_data = request_data + "/light/color_led" - else: - if light["state"]["colormode"] == "xy": - request_data = request_data + "/light/color_led" - elif light["state"]["colormode"] == "ct": - request_data = request_data + "/light/white_led" - elif light["state"]["colormode"] == "hs": - request_data = request_data + "/light/color_led" - elif address["esphome_model"] == "ESPHome-CT": - request_data = request_data + "/light/white_led" - elif address["esphome_model"] == "ESPHome-RGB": - request_data = request_data + "/light/color_led" - elif address["esphome_model"] == "ESPHome-Dimmable": - request_data = request_data + "/light/dimmable_led" - elif address["esphome_model"] == "ESPHome-Toggle": - request_data = request_data + "/light/toggle_led" - - return request_data - -def discover(bridge_config, new_lights): - logging.debug("ESPHome: invoked!") - - device_ips = check_output("nmap " + getIpAddress() + "/24 -p80 --open -n | grep report | cut -d ' ' -f5", shell=True).decode('utf-8').rstrip("\n").split("\n") - del device_ips[-1] #delete last empty element in list - for ip in device_ips: - try: - logging.debug ( "ESPHome: probing ip " + ip) - response = requests.get ("http://" + ip + "/text_sensor/light_id", timeout=3) - device = json.loads(response.text)['state'].split(';') #get device data - mac = device[1] - device_name = device[2] - ct_boost = device[3] - rgb_boost = device[4] - if response.status_code == 200 and device[0] == "esphome_diyhue_light": - logging.debug("ESPHome: Found " + device_name + " at ip " + ip) - white_response = requests.get ("http://" + ip + "/light/white_led", timeout=3) - color_response = requests.get ("http://" + ip + "/light/color_led", timeout=3) - dim_response = requests.get ("http://" + ip + "/light/dimmable_led", timeout=3) - toggle_response = requests.get ("http://" + ip + "/light/toggle_led", timeout=3) - - if (white_response.status_code != 200 and color_response.status_code != 200 and dim_response != 200 and toggle_response != 200): - logging.debug("ESPHome: Device has improper configuration! Exiting.") - raise - elif (white_response.status_code == 200 and color_response.status_code == 200): - logging.debug("ESPHome: " + device_name + " is a RGBW ESPHome device") - white_device_data = json.loads(white_response.text) - color_device_data = json.loads(color_response.text) - properties = {"rgb": True, "ct": True, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} - esphome_model = "ESPHome-RGBW" - modelid = "LCT015" - elif (white_response.status_code == 200): - logging.debug("ESPHome: " + device_name + " is a CT ESPHome device") - white_device_data = json.loads(white_response.text) - properties = {"rgb": False, "ct": True, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} - esphome_model = "ESPHome-CT" - modelid = "LWB010" - elif (color_response.status_code == 200): - logging.debug("ESPHome: " + device_name + " is a RGB ESPHome device") - color_device_data = json.loads(color_response.text) - properties = {"rgb": True, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} - esphome_model = "ESPHome-RGB" - modelid = "ESPHome-RGB" - elif (dim_response.status_code == 200): - logging.debug("ESPHome: " + device_name + " is a Dimmable ESPHome device") - dim_device_data = json.loads(dim_response.text) - properties = {"rgb": False, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} - esphome_model = "ESPHome-Dimmable" - modelid = "ESPHome-Dimmable" - elif (toggle_response.status_code == 200): - logging.debug("ESPHome: " + device_name + " is a Toggle ESPHome device") - toggle_device_data = json.loads(toggle_response.text) - properties = {"rgb": False, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} - esphome_model = "ESPHome-Toggle" - modelid = "ESPHome-Toggle" - - device_exist = False - for light in bridge_config["lights_address"].keys(): - if bridge_config["lights_address"][light]["protocol"] == "esphome" and bridge_config["lights_address"][light]["id"].split('.')[0] == properties["id"].split('.')[0]: - device_exist = True - bridge_config["lights_address"][light]["ip"] = properties["ip"] - bridge_config["lights_address"][light]["ct_boost"] = properties["ct_boost"] - bridge_config["lights_address"][light]["rgb_boost"] = properties["rgb_boost"] - logging.debug("ESPHome: light id " + properties["id"] + " already exists, updating device data...") - break - if (not device_exist): - light_name = "ESPHome id " + properties["id"][-8:] if properties["name"] == "" else properties["name"] - logging.debug("ESPHome: Adding ESPHome " + properties["id"]) - new_light_id = nextFreeId(bridge_config, "lights") - bridge_config["lights"][new_light_id] = {} - bridge_config["lights"][new_light_id]["state"] = light_types[modelid]["state"] - bridge_config["lights"][new_light_id]["type"] = light_types[modelid]["type"] - bridge_config["lights"][new_light_id]["name"] = light_name - bridge_config["lights"][new_light_id]["modelid"] = modelid - bridge_config["lights"][new_light_id]["manufacturername"] = light_types[modelid]["manufacturername"] - bridge_config["lights"][new_light_id]["swversion"] = light_types[modelid]["swversion"] - bridge_config["lights"][new_light_id]["config"] = light_types[modelid]["config"] - bridge_config["lights"][new_light_id]["uniqueid"] = mac - if modelid == "LCT015": - bridge_config["lights"][new_light_id]["capabilities"] = light_types[modelid]["capabilities"] - bridge_config["lights"][new_light_id]["streaming"] = light_types[modelid]["streaming"] - #new_lights.update({new_light_id: {"name": light_name}}) - bridge_config["lights_address"][new_light_id] = {} - bridge_config["lights_address"][new_light_id]["ip"] = properties["ip"] - bridge_config["lights_address"][new_light_id]["id"] = properties["id"] - bridge_config["lights_address"][new_light_id]["protocol"] = "esphome" - bridge_config["lights_address"][new_light_id]["rgb_boost"] = rgb_boost - bridge_config["lights_address"][new_light_id]["ct_boost"] = ct_boost - bridge_config["lights_address"][new_light_id]["esphome_model"] = esphome_model - - except Exception as e: - logging.debug("ESPHome: ip " + ip + " is unknown device, " + str(e)) - -def set_light(address, light, data): - logging.debug("ESPHome: invoked! IP=" + address["ip"]) - logging.debug(light["modelid"]) - logging.debug(data) - - ct_boost = int(address["ct_boost"]) - rgb_boost = int(address["rgb_boost"]) - request_data = "" - if ("alert" in data) and (data['alert'] == "select"): #one breath cycle, temporary for now until breath implemented in esphome firmware - request_data = request_data + "/switch/alert/turn_on" - # elif ("alert" in data) and (data['alert'] == "lselect"): #15 second breath cycle - # request_data = request_data + "/switch/alert/turn_on" - # elif ("alert" in data) and (data['alert'] == "none"): - # request_data = request_data + "/switch/alert/turn_off" - else: - request_data = request_data + getLightType(light, address, data) - if "white_led" in request_data: - postRequest(address["ip"], "/light/color_led/turn_off") - else: - postRequest(address["ip"], "/light/white_led/turn_off") - if "on" in data: - if not(data['on']): - request_data = request_data + "/turn_off" - else: - request_data = request_data + "/turn_on" - else: - request_data = request_data + "/turn_on" - if address["esphome_model"] is not "ESPHome-Toggle": - if "bri" in data: - brightness = int(data['bri']) - if address["esphome_model"] == "ESPHome-RGBW": - if light["state"]["colormode"] == "ct": - brightness = ct_boost + brightness - elif light["state"]["colormode"] == "xy": - brightness = rgb_boost + brightness - elif address["esphome_model"] == "ESPHome-CT": - brightness = ct_boost + brightness - elif address["esphome_model"] == "ESPHome-RGB": - brightness = rgb_boost + brightness - elif address["esphome_model"] == "ESPHome-Dimmable": - brightness = ct_boost + brightness - if brightness > 255: # do not send brightness values over 255 - brightness = 255 - request_data = addRequest(request_data, "brightness", brightness) - if address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB", "ESPHome-CT"]: - if ("xy" in data) and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB"]): - color = convert_xy(data['xy'][0], data['xy'][1], 255) - request_data = addRequest(request_data, "r", color[0]) - request_data = addRequest(request_data, "g", color[1]) - request_data = addRequest(request_data, "b", color[2]) - elif "ct" in data and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-CT"]): - request_data = addRequest(request_data, "color_temp", data['ct']) - elif (("hue" in data) or ("sat" in data)) and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB"]): - if (("hue" in data) and ("sat" in data)): - hue = data['hue'] - sat = data['sat'] - elif "hue" in data: - hue = data['hue'] - sat = light["state"]["sat"] - elif "sat" in data: - hue = light["state"]["hue"] - sat = data['sat'] - if "bri" not in data: - bri = light["state"]["bri"] - else: - bri = data['bri'] - color = hsv_to_rgb(hue, sat, bri) - request_data = addRequest(request_data, "r", color[0]) - request_data = addRequest(request_data, "g", color[1]) - request_data = addRequest(request_data, "b", color[2]) - if "transitiontime" in data: - request_data = addRequest(request_data, "transition", data['transitiontime']/10) - else: #Utilize default interval of 0.4 - request_data = addRequest(request_data, "transition", 0.4) - - postRequest(address["ip"], request_data) - - - - -def get_light_state(address, light): - logging.debug("ESPHome: invoked!") - state = {} - if address["esphome_model"] == "ESPHome-RGBW": - white_response = requests.get ("http://" + address["ip"] + "/light/white_led", timeout=3) - color_response = requests.get ("http://" + address["ip"] + "/light/color_led", timeout=3) - white_device = json.loads(white_response.text) #get white device data - color_device = json.loads(color_response.text) #get color device data - if white_device['state'] == 'OFF' and color_device['state'] == 'OFF': - state['on'] = False - elif white_device['state'] == 'ON': - state['on'] = True - state['ct'] = int(white_device['color_temp']) - state['bri'] = int(white_device['brightness']) - state['colormode'] = "ct" - elif color_device['state'] == 'ON': - state['on'] = True - state['xy'] = convert_rgb_xy(int(color_device['color']['r']), int(color_device['color']['g']), int(color_device['color']['b'])) - state['bri'] = int(color_device['brightness']) - state['colormode'] = "xy" - - elif address["esphome_model"] == "ESPHome-CT": - white_response = requests.get ("http://" + address["ip"] + "/light/white_led", timeout=3) - white_device = json.loads(white_response.text) #get white device data - if white_device['state'] == 'OFF': - state['on'] = False - elif white_device['state'] == 'ON': - state['on'] = True - state['ct'] = int(white_device['color_temp']) - state['bri'] = int(white_device['brightness']) - state['colormode'] = "ct" - - elif address["esphome_model"] == "ESPHome-RGB": - color_response = requests.get ("http://" + address["ip"] + "/light/color_led", timeout=3) - color_device = json.loads(color_response.text) - if color_device['state'] == 'OFF': - state['on'] = False - elif color_device['state'] == 'ON': - state['on'] = True - state['xy'] = convert_rgb_xy(int(color_device['color']['r']), int(color_device['color']['g']), int(color_device['color']['b'])) - state['bri'] = int(color_device['brightness']) - state['colormode'] = "xy" - - elif address["esphome_model"] == "ESPHome-Dimmable": - dimmable_response = requests.get ("http://" + address["ip"] + "/light/dimmable_led", timeout=3) - dimmable_device = json.loads(dimmable_response.text) - if dimmable_device['state'] == 'OFF': - state['on'] = False - elif dimmable_device['state'] == 'ON': - state['on'] = True - state['bri'] = int(dimmable_device['brightness']) - - elif address["esphome_model"] == "ESPHome-Toggle": - toggle_response = requests.get ("http://" + address["ip"] + "/light/toggle_led", timeout=3) - toggle_device = json.loads(toggle_response.text) - if toggle_device['state'] == 'OFF': - state['on'] = False - elif toggle_device['state'] == 'ON': - state['on'] = True - - return state +import json +import logging +import random +import requests + +import socket +import sys + +from time import sleep +from subprocess import check_output +from functions import light_types, nextFreeId +from functions.colors import convert_rgb_xy, convert_xy, hsv_to_rgb, rgbBrightness +from functions.network import getIpAddress + +def getRequest(address, request_data, timeout=3): + + head = {"Content-type": "application/json"} + response = requests.get("http://" + address + request_data, timeout=timeout, headers=head) + return response.text + +def postRequest(address, request_data, timeout=3): + head = {"Content-type": "application/json"} + response = requests.post("http://" + address + request_data, timeout=3, headers=head) + return response.text + +def addRequest(request_data, data_type, new_data): + if ("?" in request_data): + request_data = request_data + "&" + str(data_type) + "=" + str(new_data) + else: + request_data = request_data + "?" + str(data_type) + "=" + str(new_data) + return request_data + +def getLightType(light, address, data): + request_data = "" + if address["esphome_model"] == "ESPHome-RGBW": + if "xy" in data: #revised according to hue api docs + request_data = request_data + "/light/color_led" + elif "ct" in data: + request_data = request_data + "/light/white_led" + elif ("hue" in data) or ("sat" in data): + request_data = request_data + "/light/color_led" + else: + if light["state"]["colormode"] == "xy": + request_data = request_data + "/light/color_led" + elif light["state"]["colormode"] == "ct": + request_data = request_data + "/light/white_led" + elif light["state"]["colormode"] == "hs": + request_data = request_data + "/light/color_led" + elif address["esphome_model"] == "ESPHome-CT": + request_data = request_data + "/light/white_led" + elif address["esphome_model"] == "ESPHome-RGB": + request_data = request_data + "/light/color_led" + elif address["esphome_model"] == "ESPHome-Dimmable": + request_data = request_data + "/light/dimmable_led" + elif address["esphome_model"] == "ESPHome-Toggle": + request_data = request_data + "/light/toggle_led" + + return request_data + +def discover(bridge_config, new_lights): + logging.debug("ESPHome: invoked!") + + device_ips = check_output("nmap " + getIpAddress() + "/24 -p80 --open -n | grep report | cut -d ' ' -f5", shell=True).decode('utf-8').rstrip("\n").split("\n") + del device_ips[-1] #delete last empty element in list + for ip in device_ips: + try: + logging.debug ( "ESPHome: probing ip " + ip) + response = requests.get ("http://" + ip + "/text_sensor/light_id", timeout=3) + device = json.loads(response.text)['state'].split(';') #get device data + mac = device[1] + device_name = device[2] + ct_boost = device[3] + rgb_boost = device[4] + if response.status_code == 200 and device[0] == "esphome_diyhue_light": + logging.debug("ESPHome: Found " + device_name + " at ip " + ip) + white_response = requests.get ("http://" + ip + "/light/white_led", timeout=3) + color_response = requests.get ("http://" + ip + "/light/color_led", timeout=3) + dim_response = requests.get ("http://" + ip + "/light/dimmable_led", timeout=3) + toggle_response = requests.get ("http://" + ip + "/light/toggle_led", timeout=3) + + if (white_response.status_code != 200 and color_response.status_code != 200 and dim_response != 200 and toggle_response != 200): + logging.debug("ESPHome: Device has improper configuration! Exiting.") + raise + elif (white_response.status_code == 200 and color_response.status_code == 200): + logging.debug("ESPHome: " + device_name + " is a RGBW ESPHome device") + white_device_data = json.loads(white_response.text) + color_device_data = json.loads(color_response.text) + properties = {"rgb": True, "ct": True, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} + esphome_model = "ESPHome-RGBW" + modelid = "LCT015" + elif (white_response.status_code == 200): + logging.debug("ESPHome: " + device_name + " is a CT ESPHome device") + white_device_data = json.loads(white_response.text) + properties = {"rgb": False, "ct": True, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} + esphome_model = "ESPHome-CT" + modelid = "LWB010" + elif (color_response.status_code == 200): + logging.debug("ESPHome: " + device_name + " is a RGB ESPHome device") + color_device_data = json.loads(color_response.text) + properties = {"rgb": True, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} + esphome_model = "ESPHome-RGB" + modelid = "ESPHome-RGB" + elif (dim_response.status_code == 200): + logging.debug("ESPHome: " + device_name + " is a Dimmable ESPHome device") + dim_device_data = json.loads(dim_response.text) + properties = {"rgb": False, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} + esphome_model = "ESPHome-Dimmable" + modelid = "ESPHome-Dimmable" + elif (toggle_response.status_code == 200): + logging.debug("ESPHome: " + device_name + " is a Toggle ESPHome device") + toggle_device_data = json.loads(toggle_response.text) + properties = {"rgb": False, "ct": False, "ip": ip, "name": device_name, "id": mac, "mac": mac, "ct_boost": ct_boost, "rgb_boost": rgb_boost} + esphome_model = "ESPHome-Toggle" + modelid = "ESPHome-Toggle" + + device_exist = False + for light in bridge_config["lights_address"].keys(): + if bridge_config["lights_address"][light]["protocol"] == "esphome" and bridge_config["lights_address"][light]["id"].split('.')[0] == properties["id"].split('.')[0]: + device_exist = True + bridge_config["lights_address"][light]["ip"] = properties["ip"] + bridge_config["lights_address"][light]["ct_boost"] = properties["ct_boost"] + bridge_config["lights_address"][light]["rgb_boost"] = properties["rgb_boost"] + logging.debug("ESPHome: light id " + properties["id"] + " already exists, updating device data...") + break + if (not device_exist): + light_name = "ESPHome id " + properties["id"][-8:] if properties["name"] == "" else properties["name"] + logging.debug("ESPHome: Adding ESPHome " + properties["id"]) + new_light_id = nextFreeId(bridge_config, "lights") + bridge_config["lights"][new_light_id] = {} + bridge_config["lights"][new_light_id]["state"] = light_types[modelid]["state"] + bridge_config["lights"][new_light_id]["type"] = light_types[modelid]["type"] + bridge_config["lights"][new_light_id]["name"] = light_name + bridge_config["lights"][new_light_id]["modelid"] = modelid + bridge_config["lights"][new_light_id]["manufacturername"] = light_types[modelid]["manufacturername"] + bridge_config["lights"][new_light_id]["swversion"] = light_types[modelid]["swversion"] + bridge_config["lights"][new_light_id]["config"] = light_types[modelid]["config"] + bridge_config["lights"][new_light_id]["uniqueid"] = mac + if modelid == "LCT015": + bridge_config["lights"][new_light_id]["capabilities"] = light_types[modelid]["capabilities"] + bridge_config["lights"][new_light_id]["streaming"] = light_types[modelid]["streaming"] + #new_lights.update({new_light_id: {"name": light_name}}) + bridge_config["lights_address"][new_light_id] = {} + bridge_config["lights_address"][new_light_id]["ip"] = properties["ip"] + bridge_config["lights_address"][new_light_id]["id"] = properties["id"] + bridge_config["lights_address"][new_light_id]["protocol"] = "esphome" + bridge_config["lights_address"][new_light_id]["rgb_boost"] = rgb_boost + bridge_config["lights_address"][new_light_id]["ct_boost"] = ct_boost + bridge_config["lights_address"][new_light_id]["esphome_model"] = esphome_model + + except Exception as e: + logging.debug("ESPHome: ip " + ip + " is unknown device, " + str(e)) + +def set_light(address, light, data, rgb = None): + logging.debug("ESPHome: invoked! IP=" + address["ip"]) + logging.debug(light["modelid"]) + logging.debug(data) + + ct_boost = int(address["ct_boost"]) + rgb_boost = int(address["rgb_boost"]) + request_data = "" + if ("alert" in data) and (data['alert'] == "select"): #one breath cycle, temporary for now until breath implemented in esphome firmware + request_data = request_data + "/switch/alert/turn_on" + # elif ("alert" in data) and (data['alert'] == "lselect"): #15 second breath cycle + # request_data = request_data + "/switch/alert/turn_on" + # elif ("alert" in data) and (data['alert'] == "none"): + # request_data = request_data + "/switch/alert/turn_off" + else: + request_data = request_data + getLightType(light, address, data) + if "white_led" in request_data: + postRequest(address["ip"], "/light/color_led/turn_off") + else: + postRequest(address["ip"], "/light/white_led/turn_off") + if "on" in data: + if not(data['on']): + request_data = request_data + "/turn_off" + else: + request_data = request_data + "/turn_on" + else: + request_data = request_data + "/turn_on" + if address["esphome_model"] is not "ESPHome-Toggle": + if "bri" in data: + brightness = int(data['bri']) + if address["esphome_model"] == "ESPHome-RGBW": + if light["state"]["colormode"] == "ct": + brightness = ct_boost + brightness + elif light["state"]["colormode"] == "xy": + brightness = rgb_boost + brightness + elif address["esphome_model"] == "ESPHome-CT": + brightness = ct_boost + brightness + elif address["esphome_model"] == "ESPHome-RGB": + brightness = rgb_boost + brightness + elif address["esphome_model"] == "ESPHome-Dimmable": + brightness = ct_boost + brightness + if brightness > 255: # do not send brightness values over 255 + brightness = 255 + request_data = addRequest(request_data, "brightness", brightness) + if address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB", "ESPHome-CT"]: + if ("xy" in data) and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB"]): + if rgb: + color = rgbBrightness(rgb, light["state"]["bri"]) + else: + color = convert_xy(data['xy'][0], data['xy'][1], light["state"]["bri"]) + request_data = addRequest(request_data, "r", color[0]) + request_data = addRequest(request_data, "g", color[1]) + request_data = addRequest(request_data, "b", color[2]) + elif "ct" in data and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-CT"]): + request_data = addRequest(request_data, "color_temp", data['ct']) + elif (("hue" in data) or ("sat" in data)) and (address["esphome_model"] in ["ESPHome-RGBW", "ESPHome-RGB"]): + if (("hue" in data) and ("sat" in data)): + hue = data['hue'] + sat = data['sat'] + elif "hue" in data: + hue = data['hue'] + sat = light["state"]["sat"] + elif "sat" in data: + hue = light["state"]["hue"] + sat = data['sat'] + if "bri" not in data: + bri = light["state"]["bri"] + else: + bri = data['bri'] + color = hsv_to_rgb(hue, sat, bri) + request_data = addRequest(request_data, "r", color[0]) + request_data = addRequest(request_data, "g", color[1]) + request_data = addRequest(request_data, "b", color[2]) + if "transitiontime" in data: + request_data = addRequest(request_data, "transition", data['transitiontime']/10) + else: #Utilize default interval of 0.4 + request_data = addRequest(request_data, "transition", 0.4) + + postRequest(address["ip"], request_data) + + + + +def get_light_state(address, light): + logging.debug("ESPHome: invoked!") + state = {} + if address["esphome_model"] == "ESPHome-RGBW": + white_response = requests.get ("http://" + address["ip"] + "/light/white_led", timeout=3) + color_response = requests.get ("http://" + address["ip"] + "/light/color_led", timeout=3) + white_device = json.loads(white_response.text) #get white device data + color_device = json.loads(color_response.text) #get color device data + if white_device['state'] == 'OFF' and color_device['state'] == 'OFF': + state['on'] = False + elif white_device['state'] == 'ON': + state['on'] = True + state['ct'] = int(white_device['color_temp']) + state['bri'] = int(white_device['brightness']) + state['colormode'] = "ct" + elif color_device['state'] == 'ON': + state['on'] = True + state['xy'] = convert_rgb_xy(int(color_device['color']['r']), int(color_device['color']['g']), int(color_device['color']['b'])) + state['bri'] = int(color_device['brightness']) + state['colormode'] = "xy" + + elif address["esphome_model"] == "ESPHome-CT": + white_response = requests.get ("http://" + address["ip"] + "/light/white_led", timeout=3) + white_device = json.loads(white_response.text) #get white device data + if white_device['state'] == 'OFF': + state['on'] = False + elif white_device['state'] == 'ON': + state['on'] = True + state['ct'] = int(white_device['color_temp']) + state['bri'] = int(white_device['brightness']) + state['colormode'] = "ct" + + elif address["esphome_model"] == "ESPHome-RGB": + color_response = requests.get ("http://" + address["ip"] + "/light/color_led", timeout=3) + color_device = json.loads(color_response.text) + if color_device['state'] == 'OFF': + state['on'] = False + elif color_device['state'] == 'ON': + state['on'] = True + state['xy'] = convert_rgb_xy(int(color_device['color']['r']), int(color_device['color']['g']), int(color_device['color']['b'])) + state['bri'] = int(color_device['brightness']) + state['colormode'] = "xy" + + elif address["esphome_model"] == "ESPHome-Dimmable": + dimmable_response = requests.get ("http://" + address["ip"] + "/light/dimmable_led", timeout=3) + dimmable_device = json.loads(dimmable_response.text) + if dimmable_device['state'] == 'OFF': + state['on'] = False + elif dimmable_device['state'] == 'ON': + state['on'] = True + state['bri'] = int(dimmable_device['brightness']) + + elif address["esphome_model"] == "ESPHome-Toggle": + toggle_response = requests.get ("http://" + address["ip"] + "/light/toggle_led", timeout=3) + toggle_device = json.loads(toggle_response.text) + if toggle_device['state'] == 'OFF': + state['on'] = False + elif toggle_device['state'] == 'ON': + state['on'] = True + + return state diff --git a/BridgeEmulator/protocols/mi_box.py b/BridgeEmulator/protocols/mi_box.py index baa45cf75..710f1aa0c 100644 --- a/BridgeEmulator/protocols/mi_box.py +++ b/BridgeEmulator/protocols/mi_box.py @@ -1,5 +1,5 @@ import logging, binascii, socket, colorsys, time -from functions.colors import convert_rgb_xy, convert_xy +from functions.colors import convert_rgb_xy, convert_xy, rgbBrightness #todo: add support for multiple mi boxes? these globals don't look nice commandCounter = 0 @@ -8,7 +8,7 @@ sock = None lastSentMessageTime = 0 -def set_light(address, light, data): +def set_light(address, light, data, rgb = None): for key, value in data.items(): light["state"][key] = value @@ -18,7 +18,10 @@ def set_light(address, light, data): colormode = light["state"]["colormode"] if colormode == "xy": xy = light["state"]["xy"] - (r,g,b) = convert_xy(xy[0], xy[1], 100.0) + if rgb: + r, g, b = rgbBrightness(rgb, light["state"]["bri"]) + else: + r, g, b = convert_xy(xy[0], xy[1], light["state"]["bri"]) (hue, saturation, value) = colorsys.rgb_to_hsv(r,g,b) sendHueCmd(address, hue*255) sendSaturationCmd(address, (1-saturation)*100) diff --git a/BridgeEmulator/protocols/tasmota.py b/BridgeEmulator/protocols/tasmota.py index cab82fdb2..5d298ece3 100755 --- a/BridgeEmulator/protocols/tasmota.py +++ b/BridgeEmulator/protocols/tasmota.py @@ -9,7 +9,7 @@ from time import sleep from subprocess import check_output from functions import light_types, nextFreeId -from functions.colors import convert_rgb_xy, convert_xy +from functions.colors import convert_rgb_xy, convert_xy, rgbBrightness from functions.network import getIpAddress @@ -60,7 +60,7 @@ def discover(bridge_config, new_lights): -def set_light(address, light, data): +def set_light(address, light, data, rgb = None): logging.debug("tasmota: invoked! IP=" + address["ip"]) for key, value in data.items(): @@ -77,11 +77,11 @@ def set_light(address, light, data): elif key == "ct": color = {} elif key == "xy": - color = convert_xy(value[0], value[1], light["state"]["bri"]) - #sendRequest ("http://"+address["ip"]+"/cm?cmnd=Color%20" + str(color[0]) + "," + str(color[1]) + "," + str(color[2])) - sendRequest ("http://"+address["ip"]+"/cm?cmnd=Color%20" + rgb_to_hex((color[0],color[1],color[2]))) - logging.debug('request hex: '+rgb_to_hex((color[0],color[1],color[2]))) - + if rgb: + color = rgbBrightness(rgb, light["state"]["bri"]) + else: + color = convert_xy(value[0], value[1], light["state"]["bri"]) + sendRequest ("http://"+address["ip"]+"/cm?cmnd=Color%20" + str(color[0]) + "," + str(color[1]) + "," + str(color[2])) elif key == "alert": if value == "select": sendRequest ("http://" + address["ip"] + "/cm?cmnd=dimmer%20100") diff --git a/BridgeEmulator/protocols/yeelight.py b/BridgeEmulator/protocols/yeelight.py index c4bec757d..486cc4281 100644 --- a/BridgeEmulator/protocols/yeelight.py +++ b/BridgeEmulator/protocols/yeelight.py @@ -5,7 +5,9 @@ import sys from functions import light_types, nextFreeId -from functions.colors import convert_rgb_xy, convert_xy +from functions.colors import convert_rgb_xy, convert_xy, rgbBrightness + +Connections = {} def discover(bridge_config, new_lights): group = ("239.255.255.250", 1982) @@ -66,24 +68,31 @@ def discover(bridge_config, new_lights): sock.close() break -def command(ip, api_method, param): +def command(ip, light, api_method, param): + if ip in Connections: + c = Connections[ip] + else: + c = YeelightConnection(ip) + Connections[ip] = c try: - tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp_socket.settimeout(5) - tcp_socket.connect((ip, int(55443))) - msg = json.dumps({"id": 1, "method": api_method, "params": param}) + "\r\n" - tcp_socket.send(msg.encode()) - tcp_socket.close() - except Exception as e: - logging.warning("Yeelight command error: %s", e) + c.command(api_method, param) + finally: + if not c._music and c._connected: + c.disconnect() +def set_light(address, light, data, rgb = None): + ip = address["ip"] + if ip in Connections: + c = Connections[ip] + else: + c = YeelightConnection(ip) + Connections[ip] = c -def set_light(address, light, data): method = 'TCP' payload = {} transitiontime = 400 if "transitiontime" in data: - transitiontime = data["transitiontime"] * 100 + transitiontime = int(data["transitiontime"] * 100) for key, value in data.items(): if key == "on": if value: @@ -102,7 +111,11 @@ def set_light(address, light, data): elif key == "sat": payload["set_hsv"] = [int(light["state"]["hue"] / 182), int(value / 2.54), "smooth", transitiontime] elif key == "xy": - color = convert_xy(value[0], value[1], light["state"]["bri"]) + bri = light["state"]["bri"] + if rgb: + color = rgbBrightness(rgb, bri) + else: + color = convert_xy(value[0], value[1], bri) payload["set_rgb"] = [(color[0] * 65536) + (color[1] * 256) + color[2], "smooth", transitiontime] #according to docs, yeelight needs this to set rgb. its r * 65536 + g * 256 + b elif key == "alert" and value != "none": payload["start_cf"] = [ 4, 0, "1000, 2, 5500, 100, 1000, 2, 5500, 1, 1000, 2, 5500, 100, 1000, 2, 5500, 1"] @@ -111,7 +124,14 @@ def set_light(address, light, data): # see page 9 http://www.yeelight.com/download/Yeelight_Inter-Operation_Spec.pdf # check if hue wants to change brightness for key, value in payload.items(): - command(address["ip"], key, value) + try: + c.command(key, value) + except Exception as e: + if not c._music and c._connected: + c.disconnect() + raise e + if not c._music and c._connected: + c.disconnect() def get_light_state(address, light): #logging.info("name is: " + light["name"]) @@ -175,3 +195,119 @@ def get_light_state(address, light): state["colormode"] = "hs" tcp_socket.close() return state + +def enableMusic(ip, host_ip): + if ip in Connections: + c = Connections[ip] + if not c._music: + c.enableMusic(host_ip) + else: + c = YeelightConnection(ip) + Connections[ip] = c + c.enableMusic(host_ip) + +def disableMusic(ip): + if ip in Connections: # Else? LOL + Connections[ip].disableMusic() + +class YeelightConnection(object): + _music = False + _connected = False + _socket = None + _host_ip = "" + + def __init__(self, ip): + self._ip = ip + + def connect(self, simple = False): #Use simple when you don't need to reconnect music mode + self.disconnect() #To clean old socket + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(5) + self._socket.connect((self._ip, int(55443))) + if not simple and self._music: + self.enableMusic(self._host_ip) + else: + self._connected = True + + def disconnect(self): + self._connected = False + if self._socket: + self._socket.close() + self._socket = None + + def enableMusic(self, host_ip): + if self._connected and self._music: + raise AssertionError("Already in music mode!") + + self._host_ip = host_ip + + tempSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #Setup listener + tempSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + tempSock.settimeout(5) + + tempSock.bind(("", 0)) + port = tempSock.getsockname()[1] #Get listener port + + tempSock.listen(3) + + if not self._connected: + self.connect(True) #Basic connect for set_music + + self.command("set_music", [1, host_ip, port]) #MAGIC + self.disconnect() #Disconnect from basic mode + + while 1: + try: + conn, addr = tempSock.accept() + if addr[0] == self._ip: #Ignore wrong connections + tempSock.close() #Close listener + self._socket = conn #Replace socket with music one + self._connected = True + self._music = True + break + else: + try: + logging.info("Rejecting connection to the music mode listener from %s", self._ip) + conn.close() + except: + pass + except Exception as e: + tempSock.close() + raise ConnectionError("Yeelight with IP {} doesn't want to connect in music mode: {}".format(self._ip, e)) + + logging.info("Yeelight device with IP %s is now in music mode", self._ip) + + def disableMusic(self): + if not self._music: + return + + if self._socket: + self._socket.close() + self._socket = None + self._music = False + logging.info("Yeelight device with IP %s is no longer using music mode", self._ip) + + def send(self, data: bytes, flags: int = 0): + try: + if not self._connected: + self.connect() + self._socket.send(data, flags) + except Exception as e: + self._connected = False + raise e + + def recv(self, bufsize: int, flags: int = 0) -> bytes: + try: + if not self._connected: + self.connect() + return self._socket.recv(bufsize, flags) + except Exception as e: + self._connected = False + raise e + + def command(self, api_method, param): + try: + msg = json.dumps({"id": 1, "method": api_method, "params": param}) + "\r\n" + self.send(msg.encode()) + except Exception as e: + logging.warning("Yeelight command error: %s", e) \ No newline at end of file