Source code for sunix_ledstrip_controller_client.client

"""
Example usage of the LEDStripControllerClient can be found in the example.py file
"""
import datetime
import socket
from socket import AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, SO_BROADCAST

from sunix_ledstrip_controller_client.controller import Controller
from sunix_ledstrip_controller_client.functions import FunctionId
from sunix_ledstrip_controller_client.packets import TransitionType
from sunix_ledstrip_controller_client.packets.requests import (
    StatusRequest, SetPowerRequest, UpdateColorRequest, SetFunctionRequest, SetCustomFunctionRequest, GetTimeRequest,
    SetTimeRequest)
from sunix_ledstrip_controller_client.packets.responses import (
    StatusResponse, GetTimeResponse)


[docs]class LEDStripControllerClient: """ This class is the main interface for controlling all devices """ _discovery_port = 48899 _discovery_message = b'HF-A11ASSISTHREAD' def __init__(self): """ Creates a new client object """
[docs] def discover_controllers(self) -> [Controller]: """ Sends a broadcast message to the local network. Listening devices will respond to this broadcast with their self description :return: a list of devices """ discovered_controllers = [] cs = socket.socket(AF_INET, SOCK_DGRAM) cs.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) cs.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) # send a local broadcast via udp with a "magic packet" cs.sendto(self._discovery_message, ('255.255.255.255', self._discovery_port)) cs.setblocking(True) cs.settimeout(1) try: # TODO: allow multiple controller detection data, address = cs.recvfrom(4096) # print("Received message: \"%s\"" % data) # print("Address: " + address[0]) message = data.decode() # print(message) # parse received message data = str.split(message, ",") # check validity if len(data) == 3: # extract data ip = data[0] hw_id = data[1] model = data[2] # create a Controller object representation controller = Controller(ip, Controller.DEFAULT_PORT, hw_id, model) print(controller) discovered_controllers.append(controller) return discovered_controllers except socket.timeout: return discovered_controllers
[docs] def get_time(self, controller: Controller) -> datetime: """ Receives the current time of the controller :param controller: the controller to use :return: the current time of the controller """ request = GetTimeRequest() data = request.get_data() response_data = self._send_data(controller.get_host(), controller.get_port(), data, True) # parse and check validity of response data status_response = GetTimeResponse(response_data).get_response() dt = datetime.datetime( status_response["year"] + 2000, status_response["month"], status_response["day"], status_response["hour"], status_response["minute"], status_response["second"] ) return dt
[docs] def set_time(self, controller: Controller, date_time: datetime) -> None: """ Sets the internal time of the controller :param controller: the controller to use :param date_time: the time to set """ request = SetTimeRequest() data = request.get_data(date_time) self._send_data(controller.get_host(), controller.get_port(), data)
[docs] def update_state(self, controller: Controller) -> None: """ Updates the state of the passed in controller :param controller: the controller to update """ request = StatusRequest() data = request.get_data() response_data = self._send_data(controller.get_host(), controller.get_port(), data, True) # parse and check validity of response data status_response = StatusResponse(response_data).get_response() # update the controller values from the response controller._power_state = status_response["power_status"] controller._rgbww = [ status_response["red"], status_response["green"], status_response["blue"], status_response["warm_white"], status_response["cold_white"] ]
[docs] def turn_on(self, controller: Controller) -> None: """ Turns on a controller :param controller: the controller to turn on """ request = SetPowerRequest() data = request.get_data(True) self._send_data(controller.get_host(), controller.get_port(), data) self.update_state(controller)
[docs] def turn_off(self, controller: Controller) -> None: """ Turns on a controller :param controller: the controller to turn on """ request = SetPowerRequest() data = request.get_data(False) self._send_data(controller.get_host(), controller.get_port(), data) self.update_state(controller)
[docs] def set_rgbww(self, controller: Controller, red: int, green: int, blue: int, warm_white: int, cold_white: int) -> None: """ Sets rgbww values for the specified controller. :param controller: the controller to set the specified values on :param red: red intensity (0..255) :param green: green intensity (0..255) :param blue: blue intensity (0..255) :param warm_white: warm_white: warm white intensity (0..255) :param cold_white: cold white intensity (0..255) """ self._validate_color((red, green, blue, warm_white, cold_white), 5) request = UpdateColorRequest() data = request.get_rgbww_data(red, green, blue, warm_white, cold_white) self._send_data(controller.get_host(), controller.get_port(), data) self.update_state(controller)
[docs] def set_rgb(self, controller: Controller, red: int, green: int, blue: int) -> None: """ Sets rgbw values for the specified controller. :param controller: the controller to set the specified values on :param red: red intensity (0..255) :param green: green intensity (0..255) :param blue: blue intensity (0..255) """ self._validate_color((red, green, blue), 3) request = UpdateColorRequest() data = request.get_rgb_data(red, green, blue) self._send_data(controller.get_host(), controller.get_port(), data) self.update_state(controller)
[docs] def set_ww(self, controller: Controller, warm_white: int, cold_white: int) -> None: """ Sets warm white value for the specified controller. :param controller: the controller to set the specified values on :param warm_white: warm white intensity (0..255) :param cold_white: cold white intensity (0..255) """ self._validate_color((warm_white, cold_white), 2) request = UpdateColorRequest() data = request.get_ww_data(warm_white, cold_white) self._send_data(controller.get_host(), controller.get_port(), data) self.update_state(controller)
[docs] def get_function_list(self) -> [FunctionId]: """ :return: a list of all supported functions """ return list(FunctionId)
[docs] def set_function(self, controller: Controller, function_id: FunctionId, speed: int): """ Sets a function on the specified controller :param controller: the controller to set the function on :param function_id: Function ID :param speed: function speed [0..255] 0 is slow, 255 is fast """ request = SetFunctionRequest() data = request.get_data(function_id, speed) self._send_data(controller.get_host(), controller.get_port(), data)
[docs] def set_custom_function(self, controller: Controller, color_values: [(int, int, int, int)], speed: int, transition_type: TransitionType = TransitionType.Gradual): """ Sets a custom function on the specified controller :param controller: the controller to set the function on :param color_values: a list of up to 16 color tuples of the form (red, green, blue) or (red, green, blue, unknown). I couldn't figure out what the last parameter is used for so the rgb is a shortcut. :param transition_type: the transition type between colors :param speed: function speed [0..255] 0 is slow, 255 is fast """ for color in color_values: self._validate_color(color, len(color)) request = SetCustomFunctionRequest() data = request.get_data(color_values, speed, transition_type) self._send_data(controller.get_host(), controller.get_port(), data)
@staticmethod def _send_data(host: str, port: int, data, wait_for_response: bool = False) -> bytearray or None: """ Sends a binary data request to the specified host and port. :param host: destination host :param port: destination port :param data: the binary(!) data to send """ try: s = socket.socket() s.connect((host, port)) s.send(data) if wait_for_response: s.setblocking(True) s.settimeout(1) data = s.recv(2048) return data else: return None except socket.timeout: print("timeout") @staticmethod def _validate_color(color: (int, int, int), color_channels: int) -> None: """ Validates an int tuple that is meant to represent a color. If the color is valid this method will not do anything. There is no return value to check, the method will raise an Exception if necessary. :param color: the color tuple to validate :param color_channels: the expected amount of color channels in this color """ if len(color) != color_channels: raise ValueError( "Invalid amount of colors in color tuple. Expected " + str(color_channels) + ", got: " + str( len(color))) for color_channel in color: if color_channel < 0 or color_channel > 255: raise ValueError("Invalid color range! Expected 0-255, got: " + str(color_channel))