dimanche 30 décembre 2018

Domotique: TUYA... de l'IoT pas cher ?!

En fait, j'avais un besoin... je voulais un thermostat autonome qui puisse piloter ma chaudière même si un jour mon serveur Jeedom devait avoir un problème. Ainsi je serais WAF ;-).
Je voulais donc un thermostat connecté sympa et connectable à Jeedom par un plug-in voir un script.
Dernière chose, je ne voulais pas me faire avoir à payer 100€ pour un relai connecté en fait :-(.
J'ai donc trouvé celui-ci parce que je savais aussi qu'il était potentiellement "hackable" pour avoir une connection local et sans cloud, et pour moi il supportait potentiellement le protocol Broadlink ou j'avais déjà le plugin sous jeedom, c'est le BECA BHT-6000GCLW :
Descriptif technique
Dimensions du produit (L x l x h)9 x 9 x 1,4 cm
RéférenceBHT-6000GCLW
TailleChaudière Chauffage
CouleurNoir
Tension220 Volts
Style d'écranNumérique
Fonctions spécialesSans fil
Composants inclus1*Thermostat 2*Vis 1*Guide de l'utilisateur 1*capteur de sol (2,5 m) (capteur de sol en option) 1*QC passé
Piles incluses ?Non
Batterie(s) / Pile(s) requise(s)Non

Il est encore sur Amazon (lien non sponsorisé bien sur ;-) :     https://www.amazon.fr/gp/product/B0793Q3F62/

Mais finalement, j'ai commencé par une déception :-(, après l'avoir reçu, je me suis rendu compte que ce n'était pas ce que je croyais, il n'était pas compatible Broadlink finalement :-(....

Mais il était bien connecté et wifi... donc j'ai fait des recherches, j'ai même contacté la société BECA pour avoir des infos et si une API existait... et ils ont répondu positivement par une API... Cloud :-(... ce que je voulais éviter. Cela néanmoins permis de savoir plus ce que je cherchais, j'ai compris que ce fabricant utilisait un framework complet (https://en.tuya.com/) :



Il est fourni à pleins d'autres fabricant et basé sur la même plateforme à base d'ESP8266 modifié  !!!! Et oui de l'ESP8266... j'aurais pu décider donc de le flasher et de changer le code interne du device mais j'ai voulu voir si certains n'avait pas essayer de se connecter en local finalement et sans le flasher surtout, je ne voulais pas réinventer la roue et je voulais garder le device le plus "intègre" possible ;-)

Et donc en cherchant une API TUYA "locale", j'ai trouvé plusieurs projets (et j'ai bien galèré finalement :-( !) et finalement je me suis attardé sur celui-ci qui avait été modifié en 2018 et en python (pratique pour mes scripts Jeedom ;-) :
https://pypi.org/project/pytuya/ (le code: https://github.com/clach04/python-tuya)

Pour mon cas, j'ai donc expérimenté cette lib python, mais ce ne fut pas simple, il fallait dans un premier temps récupérer la clé et l'id du device en question. Pour cela il faut récupérer les infos pendant l'enrôlement, vu que j'ai des rapsberry pi pour jeedom, j'ai pris la solution utilisant linux.
De plus, Il ne faut pas forcement utiliser l'application du fabricant, j'ai donc opté pour TuyaSmart:

et dans mon cas sur iOS :
https://itunes.apple.com/us/app/tuyasmart/id1034649547?mt=8
mais cela existe aussi sur Android :
https://play.google.com/store/apps/details?id=com.tuya.smart&hl=fr

Et donc j'ai suivi le processus suivant  qui fut finalement super simple mais j'ai mis du temps à comprendre comment trusté le certificat dans mon téléphone, le boulet ;-):

Get Local ID/KEY of a Tuya Device (on Raspbian)
  1. Add any devices you want to use with tuyapi to the Tuya Smart app.
  2. Install the CLI tool by running « npm i @tuyapi/cli -g ». If it returns an error, you may need to prefix the command with sudo. (Tip: using sudo to install global packages is not considered best practice. See this NPM article for some help.)
  3. Install AnyProxy by running « sudo npm i anyproxy -g ». Then run « anyproxy-ca » to create ca and run « sudo anyproxy-ca » to trust it  .
  4. Run tuya-cli list-app. It will print out a QR code; scan it with your phone and install the root certificate. 
  5. After installation, trust the installed root certificate from the phone.
  6. Configure the proxy on your phone with the parameters provided in the console.
  7. Open Tuya Smart and refresh the list of devices by "pulling down".
  8. A list of ID and key pairs should appear in the console.
  9. It's recommended to untrust the root certificate after you're done for security purposes.

Ensuite, j'ai installé la lib sur mon raspberry pi avec la commande suivante :


pi@raspberry:~ $ sudo pip install pytuya

Et j'ai créé un script pour jeedom (j'ai aussi galéré à comprendre les interfaces... il en manquait en fait pour mon besoin), voici la dernière version que j'ai mis en place pour moi (j'ai mis plusieurs soirée avant d'avoir ce que je voulais), c'est le contenu du fichier "tuya_api.py" que l'on voit après dans les screenshots Jeedom:

# install using : sudo pip install pytuya
# need a pytuya version with this new function :
#    def set_value(self, index, value):
#        """
#        Set int value of any index.
#
#       Args:
#            index(int): index to set
#            value(int): new value for the index
#        """
#        
#        payload = self.generate_payload(SET, {
#            index: value})
#        data = self._send_receive(payload)
#        return data
      
import pytuya
import sys
import argparse

def str2bool(v):
  return v.lower() in ("yes", "true", "t", "1")

parser = argparse.ArgumentParser()
parser.add_argument("-i","--index", type=str, required=True, help="DPS Index of TUYA device");
parser.add_argument("-v","--value", type=int, help="value to set (decimal value) of TUYA device");
parser.add_argument("-s","--status", type=str2bool, help="status to set to False or True of TUYA device");


args = parser.parse_args()
#print('index: %r' % args.index);
#print('value: %r' % args.value);
#print('status: %r' % args.status);


#******part to modify for your product/case***********
IP='IP of you device';
ID='your device ID';
KEY='your device key';
#*****************************************************

index = args.index
value = args.value
status = args.status

d = pytuya.Device(ID, IP , KEY, 'device')

# increase consigne for one degree
data = d.status()
#print('Dictionary %r' % data)
#example:
#Dictionary {u'devId': u'XXXXXXXXXXXXXXXXXXX', u'dps': {u'1': True, u'3': 41, u'2': 39, u'5': False, u'4': u'1', u'6': False, u'102': 0, u'103': u'1', u'104': True}}
# index values for BECA Thermostat: 
#    DPS_INDEX_ON            = '1'
#    DPS_INDEX_TEMPERATURE   = '3'
#    DPS_INDEX_SETPOINT      = '2'
#    DPS_INDEX_4_UNKNOWN     = '4'
#    DPS_INDEX_5_BOOL        = '5'
#    DPS_INDEX_6_BOOL        = '6'
#    DPS_INDEX_102_NUM       = '102'
#    DPS_INDEX_103_UNKNOW    = '103'
#    DPS_INDEX_104_BOOL      = '104'
    
if value is not None:
   data = d.set_value(index, value)  
elif status is not None:
   data = d.set_status(status, index)      
else:
   print('%r' % data['dps'][index])  # Show status or value 

Voici des example d'appel à mon script dans jeedom :

"tuya_api.py -i 2 -v #slider#" => pour le cas où je voulez régler la consigne du thermostat qui se trouve en index 2
"tuya_api.py -i 2" => juste pour lire la consigne du thermostat (toujours en index 2)
"tuya_api.py -i 3" => pour lire la température mesuré par le thermostat (en index 3)
"tuya_api.py -i 1 -s True" => pour allumer l'écran du thermostat (en index 1)
"tuya_api.py -i 1 -s False" => pour éteindre l'écran du thermostat (en index 1)
"tuya_api.py -i 1" => pour lire l'état de l'écran du thermostat (allumé ou éteint et en index 1 aussi)

Voici un screen shot dans jeedom pour ceux qui n'ont pas encore compris comment configurer un script dans jeedom ;-) :


Donc ce script permet de lire les status du device, de les mettre à jour quand c'est possible mais finalement dans mon cas, il manquait la possibilité aussi de mettre une valeur décimal à jour comme la consigne du thermostat.

Donc j'ai du regarder le code de la librairie et finalement j'ai du créer une nouvelle commande "set_value" lié à la class "Device" pour être le plus générique possible.
J'ai donc commencé à contribué et à faire un fork du projet PYTUYA: https://github.com/bozothegeek/python-tuya
A ce jour, ce n'est pas encore "releasé" (on est en 7.0.2) mais vous pouvez utiliser mon fork de la lib ;-)
Si vous voulez utiliser mon code à court terme, je conseille de faire ainsi :
1) Vous installé comme d'hab

pi@raspberry:~ $ sudo pip install pytuya

2) Vous récupérer mon fichier python modifié "___init__.py" ci-après:
je conseille de télécharger le projet en ligne de commande à partir de votre raspbian ainsi:
pi@raspberry:~ $ git fetch https://github.com/bozothegeek/python-tuya.git

# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices
# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
#   SKYROKU SM-PW701U Wi-Fi Plug Smart Plug
#   Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa
#
# This would not exist without the protocol reverse engineering from
# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
#
# Tested with Python 2.7 and Python 3.6.1 only


import base64
from hashlib import md5
import json
import logging
import socket
import sys
import time
import colorsys

try:
    #raise ImportError
    import Crypto
    from Crypto.Cipher import AES  # PyCrypto
except ImportError:
    Crypto = AES = None
    import pyaes  # https://github.com/ricmoo/pyaes


version_tuple = (7, 0, 2)
version = version_string = __version__ = '%d.%d.%d' % version_tuple
__author__ = 'clach04'

log = logging.getLogger(__name__)
logging.basicConfig()  # TODO include function name/line numbers in log
#log.setLevel(level=logging.DEBUG)  # Debug hack!

log.info('Python %s on %s', sys.version, sys.platform)
if Crypto is None:
    log.info('Using pyaes version %r', pyaes.VERSION)
    log.info('Using pyaes from %r', pyaes.__file__)
else:
    log.info('Using PyCrypto %r', Crypto.version_info)
    log.info('Using PyCrypto from %r', Crypto.__file__)

SET = 'set'

PROTOCOL_VERSION_BYTES = b'3.1'

IS_PY2 = sys.version_info[0] == 2

class AESCipher(object):
    def __init__(self, key):
        #self.bs = 32  # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
        self.bs = 16
        self.key = key
    def encrypt(self, raw):
        if Crypto:
            raw = self._pad(raw)
            cipher = AES.new(self.key, mode=AES.MODE_ECB)
            crypted_text = cipher.encrypt(raw)
        else:
            _ = self._pad(raw)
            cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key))  # no IV, auto pads to 16
            crypted_text = cipher.feed(raw)
            crypted_text += cipher.feed()  # flush final block
        #print('crypted_text %r' % crypted_text)
        #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
        crypted_text_b64 = base64.b64encode(crypted_text)
        #print('crypted_text_b64 (%d) %r' % (len(crypted_text_b64), crypted_text_b64))
        return crypted_text_b64
    def decrypt(self, enc):
        enc = base64.b64decode(enc)
        #print('enc (%d) %r' % (len(enc), enc))
        #enc = self._unpad(enc)
        #enc = self._pad(enc)
        #print('upadenc (%d) %r' % (len(enc), enc))
        if Crypto:
            cipher = AES.new(self.key, AES.MODE_ECB)
            raw = cipher.decrypt(enc)
            #print('raw (%d) %r' % (len(raw), raw))
            return self._unpad(raw).decode('utf-8')
            #return self._unpad(cipher.decrypt(enc)).decode('utf-8')
        else:
            cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key))  # no IV, auto pads to 16
            plain_text = cipher.feed(enc)
            plain_text += cipher.feed()  # flush final block
            return plain_text
    def _pad(self, s):
        padnum = self.bs - len(s) % self.bs
        return s + padnum * chr(padnum).encode()
    @staticmethod
    def _unpad(s):
        return s[:-ord(s[len(s)-1:])]


def bin2hex(x, pretty=False):
    if pretty:
        space = ' '
    else:
        space = ''
    if IS_PY2:
        result = ''.join('%02X%s' % (ord(y), space) for y in x)
    else:
        result = ''.join('%02X%s' % (y, space) for y in x)
    return result


def hex2bin(x):
    if IS_PY2:
        return x.decode('hex')
    else:
        return bytes.fromhex(x)

# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi
payload_dict = {
  "device": {
    "status": {
      "hexByte": "0a",
      "command": {"gwId": "", "devId": ""}
    },
    "set": {
      "hexByte": "07",
      "command": {"devId": "", "uid": "", "t": ""}
    },
    "prefix": "000055aa00000000000000",    # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
    "suffix": "000000000000aa55"
  }
}

class XenonDevice(object):
    def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10):
        """
        Represents a Tuya device.
        
        Args:
            dev_id (str): The device id.
            address (str): The network address.
            local_key (str, optional): The encryption key. Defaults to None.
            dev_type (str, optional): The device type.
                It will be used as key for lookups in payload_dict.
                Defaults to None.
            
        Attributes:
            port (int): The port to connect to.
        """
        self.id = dev_id
        self.address = address
        self.local_key = local_key
        self.local_key = local_key.encode('latin1')
        self.dev_type = dev_type
        self.connection_timeout = connection_timeout

        self.port = 6668  # default - do not expect caller to pass in

    def __repr__(self):
        return '%r' % ((self.id, self.address),)  # FIXME can do better than this

    def _send_receive(self, payload):
        """
        Send single buffer `payload` and receive a single buffer.
        
        Args:
            payload(bytes): Data to send.
        """
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        s.settimeout(self.connection_timeout)
        s.connect((self.address, self.port))
        s.send(payload)
        data = s.recv(1024)
        s.close()
        return data

    def generate_payload(self, command, data=None):
        """
        Generate the payload to send.

        Args:
            command(str): The type of command.
                This is one of the entries from payload_dict
            data(dict, optional): The data to be send.
                This is what will be passed via the 'dps' entry
        """
        json_data = payload_dict[self.dev_type][command]['command']

        if 'gwId' in json_data:
            json_data['gwId'] = self.id
        if 'devId' in json_data:
            json_data['devId'] = self.id
        if 'uid' in json_data:
            json_data['uid'] = self.id  # still use id, no seperate uid
        if 't' in json_data:
            json_data['t'] = str(int(time.time()))

        if data is not None:
            json_data['dps'] = data

        # Create byte buffer from hex data
        json_payload = json.dumps(json_data)
        #print(json_payload)
        json_payload = json_payload.replace(' ', '')  # if spaces are not removed device does not respond!
        json_payload = json_payload.encode('utf-8')
        log.debug('json_payload=%r', json_payload)

        if command == SET:
            # need to encrypt
            #print('json_payload %r' % json_payload)
            self.cipher = AESCipher(self.local_key)  # expect to connect and then disconnect to set new
            json_payload = self.cipher.encrypt(json_payload)
            #print('crypted json_payload %r' % json_payload)
            preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key
            #print('preMd5String %r' % preMd5String)
            m = md5()
            m.update(preMd5String)
            #print(repr(m.digest()))
            hexdigest = m.hexdigest()
            #print(hexdigest)
            #print(hexdigest[8:][:16])
            json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload
            #print('data_to_send')
            #print(json_payload)
            #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload))
            #print('json_payload  %r' % repr(json_payload))
            #print('json_payload len %r' % len(json_payload))
            #print(bin2hex(json_payload))
            self.cipher = None  # expect to connect and then disconnect to set new


        postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix'])
        #print('postfix_payload %r' % postfix_payload)
        #print('postfix_payload %r' % len(postfix_payload))
        #print('postfix_payload %x' % len(postfix_payload))
        #print('postfix_payload %r' % hex(len(postfix_payload)))
        assert len(postfix_payload) <= 0xff
        postfix_payload_hex_len = '%x' % len(postfix_payload)  # TODO this assumes a single byte 0-255 (0x00-0xff)
        buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + 
                          payload_dict[self.dev_type][command]['hexByte'] + 
                          '000000' +
                          postfix_payload_hex_len ) + postfix_payload
        #print('command', command)
        #print('prefix')
        #print(payload_dict[self.dev_type][command]['prefix'])
        #print(repr(buffer))
        #print(bin2hex(buffer, pretty=True))
        #print(bin2hex(buffer, pretty=False))
        #print('full buffer(%d) %r' % (len(buffer), buffer))
        return buffer
    
class Device(XenonDevice):
    def __init__(self, dev_id, address, local_key=None, dev_type=None):
        super(Device, self).__init__(dev_id, address, local_key, dev_type)
    
    def status(self):
        log.debug('status() entry')
        # open device, send request, then close connection
        payload = self.generate_payload('status')

        data = self._send_receive(payload)
        log.debug('status received data=%r', data)

        result = data[20:-8]  # hard coded offsets
        log.debug('result=%r', result)
        #result = data[data.find('{'):data.rfind('}')+1]  # naive marker search, hope neither { nor } occur in header/footer
        #print('result %r' % result)
        if result.startswith(b'{'):
            # this is the regular expected code path
            if not isinstance(result, str):
                result = result.decode()
            result = json.loads(result)
        elif result.startswith(PROTOCOL_VERSION_BYTES):
            # got an encrypted payload, happens occasionally
            # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
            # NOTE dps.2 may or may not be present
            result = result[len(PROTOCOL_VERSION_BYTES):]  # remove version header
            result = result[16:]  # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
            cipher = AESCipher(self.local_key)
            result = cipher.decrypt(result)
            log.debug('decrypted result=%r', result)
            if not isinstance(result, str):
                result = result.decode()
            result = json.loads(result)
        else:
            log.error('Unexpected status() payload=%r', result)

        return result

    def set_status(self, on, switch=1):
        """
        Set status of the device to 'on' or 'off'.
        
        Args:
            on(bool):  True for 'on', False for 'off'.
            switch(int): The switch to set
        """
        # open device, send request, then close connection
        if isinstance(switch, int):
            switch = str(switch)  # index and payload is a string
        payload = self.generate_payload(SET, {switch:on})
        #print('payload %r' % payload)

        data = self._send_receive(payload)
        log.debug('set_status received data=%r', data)

        return data

    def set_value(self, index, value):
        """
        Set int value of any index.

        Args:
            index(int): index to set
            value(int): new value for the index
        """
        # open device, send request, then close connection
        if isinstance(index, int):
            index = str(index)  # index and payload is a string

        payload = self.generate_payload(SET, {
            index: value})
        
        data = self._send_receive(payload)
        
        return data

    def turn_on(self, switch=1):
        """Turn the device on"""
        self.set_status(True, switch)

    def turn_off(self, switch=1):
        """Turn the device off"""
        self.set_status(False, switch)

    def set_timer(self, num_secs):
        """
        Set a timer.
        
        Args:
            num_secs(int): Number of seconds
        """
        # FIXME / TODO support schemas? Accept timer id number as parameter?

        # Dumb heuristic; Query status, pick last device id as that is probably the timer
        status = self.status()
        devices = status['dps']
        devices_numbers = list(devices.keys())
        devices_numbers.sort()
        dps_id = devices_numbers[-1]

        payload = self.generate_payload(SET, {dps_id:num_secs})

        data = self._send_receive(payload)
        log.debug('set_timer received data=%r', data)
        return data

class OutletDevice(Device):
    def __init__(self, dev_id, address, local_key=None):
        dev_type = 'device'
        super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type)

class BulbDevice(Device):
    DPS_INDEX_ON         = '1'
    DPS_INDEX_MODE       = '2'
    DPS_INDEX_BRIGHTNESS = '3'
    DPS_INDEX_COLOURTEMP = '4'
    DPS_INDEX_COLOUR     = '5'

    DPS             = 'dps'
    DPS_MODE_COLOUR = 'colour'
    DPS_MODE_WHITE  = 'white'

    def __init__(self, dev_id, address, local_key=None):
        dev_type = 'device'
        super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type)

    @staticmethod
    def _rgb_to_hexvalue(r, g, b):
        """
        Convert an RGB value to the hex representation expected by tuya.
        
        Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format:
        rrggbb0hhhssvv
        
        While r, g and b are just hexadecimal values of the corresponding
        Red, Green and Blue values, the h, s and v values (which are values
        between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively.
        
        Args:
            r(int): Value for the colour red as int from 0-255.
            g(int): Value for the colour green as int from 0-255.
            b(int): Value for the colour blue as int from 0-255.
        """
        rgb = [r,g,b]
        hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255)

        hexvalue = ""
        for value in rgb:
            temp = str(hex(int(value))).replace("0x","")
            if len(temp) == 1:
                temp = "0" + temp
            hexvalue = hexvalue + temp

        hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
        hexvalue_hsv = ""
        for value in hsvarray:
            temp = str(hex(int(value))).replace("0x","")
            if len(temp) == 1:
                temp = "0" + temp
            hexvalue_hsv = hexvalue_hsv + temp
        if len(hexvalue_hsv) == 7:
            hexvalue = hexvalue + "0" + hexvalue_hsv
        else:
            hexvalue = hexvalue + "00" + hexvalue_hsv

        return hexvalue

    @staticmethod
    def _hexvalue_to_rgb(hexvalue):
        """
        Converts the hexvalue used by tuya for colour representation into
        an RGB value.
        
        Args:
            hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
        """
        r = int(hexvalue[0:2], 16)
        g = int(hexvalue[2:4], 16)
        b = int(hexvalue[4:6], 16)

        return (r, g, b)

    @staticmethod
    def _hexvalue_to_hsv(hexvalue):
        """
        Converts the hexvalue used by tuya for colour representation into
        an HSV value.
        
        Args:
            hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
        """
        h = int(hexvalue[7:10], 16) / 360
        s = int(hexvalue[10:12], 16) / 255
        v = int(hexvalue[12:14], 16) / 255

        return (h, s, v)

    def set_colour(self, r, g, b):
        """
        Set colour of an rgb bulb.

        Args:
            r(int): Value for the colour red as int from 0-255.
            g(int): Value for the colour green as int from 0-255.
            b(int): Value for the colour blue as int from 0-255.
        """
        if not 0 <= r <= 255:
            raise ValueError("The value for red needs to be between 0 and 255.")
        if not 0 <= g <= 255:
            raise ValueError("The value for green needs to be between 0 and 255.")
        if not 0 <= b <= 255:
            raise ValueError("The value for blue needs to be between 0 and 255.")

        print(BulbDevice)
        hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)

        payload = self.generate_payload(SET, {
            self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR,
            self.DPS_INDEX_COLOUR: hexvalue})
        data = self._send_receive(payload)
        return data

    def set_white(self, brightness, colourtemp):
        """
        Set white coloured theme of an rgb bulb.

        Args:
            brightness(int): Value for the brightness (25-255).
            colourtemp(int): Value for the colour temperature (0-255).
        """
        if not 25 <= brightness <= 255:
            raise ValueError("The brightness needs to be between 25 and 255.")
        if not 0 <= colourtemp <= 255:
            raise ValueError("The colour temperature needs to be between 0 and 255.")

        payload = self.generate_payload(SET, {
            self.DPS_INDEX_MODE: self.DPS_MODE_WHITE,
            self.DPS_INDEX_BRIGHTNESS: brightness,
            self.DPS_INDEX_COLOURTEMP: colourtemp})

        data = self._send_receive(payload)
        return data

    def set_brightness(self, brightness):
        """
        Set the brightness value of an rgb bulb.

        Args:
            brightness(int): Value for the brightness (25-255).
        """
        if not 25 <= brightness <= 255:
            raise ValueError("The brightness needs to be between 25 and 255.")

        payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness})
        data = self._send_receive(payload)
        return data

    def set_colourtemp(self, colourtemp):
        """
        Set the colour temperature of an rgb bulb.

        Args:
            colourtemp(int): Value for the colour temperature (0-255).
        """
        if not 0 <= colourtemp <= 255:
            raise ValueError("The colour temperature needs to be between 0 and 255.")

        payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp})
        data = self._send_receive(payload)
        return data

    def brightness(self):
        """Return brightness value"""
        return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS]

    def colourtemp(self):
        """Return colour temperature"""
        return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP]

    def colour_rgb(self):
        """Return colour as RGB value"""
        hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
        return BulbDevice._hexvalue_to_rgb(hexvalue)

    def colour_hsv(self):
        """Return colour as HSV value"""
        hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
        return BulbDevice._hexvalue_to_hsv(hexvalue)

    def state(self):
        status = self.status()
        state = {
            'is_on'      : status[self.DPS][self.DPS_INDEX_ON],
            'mode'       : status[self.DPS][self.DPS_INDEX_MODE],
            'brightness' : status[self.DPS][self.DPS_INDEX_BRIGHTNESS],
            'colourtemp' : status[self.DPS][self.DPS_INDEX_COLOURTEMP],
            'colour'     : status[self.DPS][self.DPS_INDEX_COLOUR],
            }
        return state

3) puis le recopier de votre répertoire home vers le répertoire de la lib installée, vous pouvez vérifiez le chemin avant (regardez le chemin en rouge dans mon exemple) :

pi@raspberry:~ $ sudo pip show pytuya
Name: pytuya
Version: 7.0.2
Summary: Python interface to ESP8266MOD WiFi smart devices from Shenzhen Xenon
Home-page: https://github.com/clach04/python-tuya
Author: clach04
Author-email: UNKNOWN
License: MIT
Location: /usr/local/lib/python2.7/dist-packages
Requires: pyaes

Puis allez dans le projet téléchargé et copiez donc la version que vous aurez en local avec la commande sudo vers le répertoire identifié dans la commande précédente:


pi@raspberry:~ $ cd python-tuya/pytuya 

pi@raspberry:~ $ sudo cp __init__.py /usr/local/lib/python2.7/dist-packages/pytuya



Conclusion:
Dans tout les cas, si vous avez un autre device "Tuya", c'est toujours la même procedure, et pour voir ce que peux faire votre device, vous pouvez déjà regarder ce que retourne la commande "data = d.status()" qui en général renvoi un JSON avec des infos en binary, string ou integer. Et ainsi, vous pouvez faire vous même le reverse engineering de votre device.
Allez voir aussi le wiki pour plus d'info sur les devices "Tuya": https://github.com/clach04/python-tuya/wiki

J'espère que mes recherches pourront vous aider dans votre projet.

Enjoy !!!!


samedi 29 décembre 2018

Domotique: XBOX API & scripts dans Jeedom

En fait, j'ai eu besoin de pouvoir faire des statistiques sur les jeux et l'utilisation des consoles par mes enfants. Finalement, j'ai jamais vraiment utilisé cela pour les surveiller mais plus pour le challenge de voir si une api existait pour cela... et je suis tombé sur cela qui n'est pas une API officiel mais qui m'a satisfait pour mes besoins : https://xboxapi.com/


...qui marche sur xbox one mais je pense aussi sur xbox 360 ou pc si vous avez un compte xbox live ! (je vous laisse tester ces cas mais j'ai testé que sur xbox one et avec plusieurs comptes ;-)

En fait, cette API est gratuite et si votre utilisation est limité à 60 requêtes par heure et par compte xbox live par compte (je crois ;-). Pour l'instant, je n'ai pu associé qu'un seul compte xbox live par compte xbox API. 

Il faut créer un compte sur ce site et y lier votre comptre xbox live, il faudra y récupérer 2 infos, le "XBOX Profile User ID" et  le "XboxAPI API Key" comme on peut le voir sur un de mes comptes :



Ensuite, on va pouvoir utiliser des scripts pour récupérer des infos de l'API, j'ai essayé de faire un script générique (plus ou moins ;-) et donc avec en passage de paramètre comme la clé et le profile, et aussi dans certains cas, je rajoute l'info que je cherche dans le résultat de l'API, je vous expliquerais plus loin le principe.

Donc on peut passer sur Jeedom dès lors que le compte est créé, lié au xbox live account et que l'on a la clé et le profile....

Voici donc le script en perl pour cette fois (me demandez pas pourquoi ;-) :


#!/usr/bin/perl -s
use LWP::UserAgent;
use Net::Ping;
use JSON qw( decode_json ); # need to install json parsing library using this command: sudo apt-get install libjson-pp-perl

no warnings 'uninitialized';

#HTTP example command
#https://xboxapi.com/v2/profile
#https://xboxapi.com/v2/{xuid}/presence
#see more command from here: https://xboxapi.com/documentation

$ua=LWP::UserAgent->new; $ua->timeout(5);

if ($h==1 or (not defined $k) or (not defined $c) or (not defined $t))
{
  print("
  Script to use with https://xboxapi.com
  
  Need to install json parsing library using this command: sudo apt-get install libjson-pp-perl
  
  Parameters are: 
  -h for help 
  -k= (Mandatory)
  -c= (Mandatory)
  -t= (Mandatory)
  -p= (Optional but requested for some commands)
  
  #HTTP example command from XBOXAPI
  #https://xboxapi.com/v2/profile
  #https://xboxapi.com/v2/{xuid}/presence
  #see more command from here: https://xboxapi.com/documentation
  ");
}
else
{  
      #print("XBOX API Key=$k\n");
      #print("command=$c\n");
      
      if (defined $p)
      { 
        #print("XBOX profile User ID=$p\n");
        #X-AUTH token to get from your account of XBOXAPI, you need to have/create account from https://xboxapi.com
        $res=$ua->get("https://xboxapi.com/v2/$p/$c",'X-AUTH' => $k);
      }
      else
      {
        #X-AUTH token to get from your account of XBOXAPI, you need to have/create account from https://xboxapi.com
        $res=$ua->get("https://xboxapi.com/v2/$c",'X-AUTH' => $k);
      }

        unless ($res->is_success) { warn $res->status_line };

      #json parsing
      $decoded = decode_json($res->content);
      
      #print("tag to parse=$t\n");
      if((substr $t, 0, 1) ne '{')
      {
         #try to add { and } to help parsing of the string if missing
         $t = '{'.$t.'}';        
      }
      my $cmd = '$result = $decoded->'. $t. ';';
      eval "$cmd";
      print $result;
}

Et comme vous pouvez le voir dans les codes et les commentaires, il faudra bien s'assurer d'avoir installé la dépendance suivante et de cette manière : "sudo apt-get install libjson-pp-perl" pour gérer au mieux le JSON que l'on doit parser et venant de XBOXAPI.COM.

Dans jeedom, il faut juste créer un script "info" perl avec le code que j'ai donné (dans mon cas, j'ai appelé le script "xboxapi.perl", voici un screenshot où j'ai juste viré ma clé et mon id de profil bien sur ;-)



Il y a différents paramètres mais voici des exemples d'appel de ce script qui parlerons d'eux mêmes.

Pour savoir si le compte xbox live est "Online" ou pas : 

xboxapi.perl -k="votre API key" -p="votre profile id" -c=presence -t=state

Pour connaitre le gamerTag du profil : 

xboxapi.perl -k="votre API key" -c=profile -t=gamerTag

Pour savoir quel est le jeu en cours ou si on se trouve dans le menu de la xbox : 

xboxapi.perl -k="votre API key" -p="votre profile id" c=presence -t={devices}[0]{titles}[1]{name}

Pour connaitre le status du jeu (Dans Fortnite, vous aurez le nombre de gens restant sur la map par exemple ;-) : 

xboxapi.perl -k="votre API key" -p="votre profile id" -c=presence -t={devices}[0]{titles}[1]{activity}{richPresence}

Si vous voulez parser d'autres infos, il faudra aller voir la doc ou appeler l'API en direct et regarder ce que l'on trouve dans le JSON renvoyé.

Voici un exemple de JSON suite à une commande "presence" (j'ai mis des X pour les ids que je ne veux pas révéler):


{"xuid":XXXXXXXXXXXXXXXX,"state":"Online","devices":[{"type":"XboxOne","titles":[{"id":XXXXXXXX,"name":"Accueil","placement":"Background","state":"Active","lastModified":"2018-12-29T16:57:30.7121865Z"},{"id":XXXXXXXXXXX,"activity":{"richPresence":"CLASS\u00c9 - Manche 6\/6 (d\u00e9cisive)"},"name":"Tom Clancy's Rainbow Six Siege","placement":"Full","state":"Active","lastModified":"2018-12-29T16:57:30.7121865Z"}]}],"cloaked":true}


Conclusion:

Voilà, jespère que cela aidera certains à jouer avec cette API bien sympa et qui permet d'intégrer un peu plus encore nos consoles de jeu ;-)

On pouvait déjà utiliser l'UPNP/DLNA pour savoir si les consoles sont allumées (un simple PING ne marchait pas toujours :-( ) mais avec ce script et cette API, on peut avoir plus d'info... On pourrait imaginer faire un contrôle parental avec nos boxes domotiques en Jeedom ou autre utilisant des scripts perl.

Enjoy !!!