1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#! /usr/bin/env python3
import random
import time
class Remote:
_remote_type_alias_map = {
'fut089': 'rgb+cct'
}
_remote_type_parameters_map = {
'rgbw': {
'retries': 10,
'delay': 0.001,
'channels': [9, 40, 71],
'syncword': [0x258B, 0x147A],
'features': [
'can_set_brightness',
'has_brightness',
'has_white',
'has_color'
|
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#! /usr/bin/env python3
import random
import time
class Remote:
_remote_type_alias_map = {
'fut089': 'rgb+cct'
}
_remote_type_parameters_map = {
'rgbw': {
'retries': 10,
'delay': 0.05,
'channels': [9, 40, 71],
'syncword': [0x258B, 0x147A],
'features': [
'can_set_brightness',
'has_brightness',
'has_white',
'has_color'
|
︙ | | | ︙ | |
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
'syncword': [0xAA55, 0x50A0]
}
}
def __init__(self, radio, remote_type, remote_id, message_id = None, config = None):
# Pull in the config for this remote type
self._config = self._get_type_parameters(remote_type)
# Allow the user to specify some more parameters
if config is not None:
self._config.update(config)
# Store parameters
self._radio = radio
|
>
|
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
'syncword': [0xAA55, 0x50A0]
}
}
def __init__(self, radio, remote_type, remote_id, message_id = None, config = None):
# Pull in the config for this remote type
self._config = self._get_type_parameters(remote_type)
self._config['radio_queue'] = '__DEFAULT__'
# Allow the user to specify some more parameters
if config is not None:
self._config.update(config)
# Store parameters
self._radio = radio
|
︙ | | | ︙ | |
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
def _debug(self, message):
if 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
def _get_type_parameters(self, remote_type):
config = self._remote_type_parameters_map[remote_type]
# Supply default config values
if 'retries' not in config:
config['retries'] = 3
if 'delay' not in config:
config['delay'] = 0.1
|
>
|
|
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
def _debug(self, message):
if 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
def _get_type_parameters(self, remote_type):
config = {}
config.update(self._remote_type_parameters_map[remote_type])
# Supply default config values
if 'retries' not in config:
config['retries'] = 3
if 'delay' not in config:
config['delay'] = 0.1
|
︙ | | | ︙ | |
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
button_name = button_name + ':' + str(zone)
# Look up the button
button_id = self._config['button_map'][button_name]
# Compute message body
body = [zone, button_id, message_id]
# Compute message trailer
## Include a CRC, for good measure ?
crc = len(header) + len(body) + 1
for byte in header + body:
crc = crc + byte
crc = crc & 0xff
trailer = [crc]
message = header + body + trailer
return message
def _parse_button_message_cct(self, button_message):
button_info = {}
# Verify the header -- if it is not valid, return None
|
>
>
>
|
|
|
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
button_name = button_name + ':' + str(zone)
# Look up the button
button_id = self._config['button_map'][button_name]
# Compute message body
body = [zone, button_id, message_id]
# Compute the whole message so far
message = header + body
# Compute message trailer
## Include a CRC, for good measure ?
crc = len(message) + 1
for byte in header + body:
crc = crc + byte
crc = crc & 0xff
trailer = [crc]
message = message + trailer
return message
def _parse_button_message_cct(self, button_message):
button_info = {}
# Verify the header -- if it is not valid, return None
|
︙ | | | ︙ | |
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
def _get_next_message_id(self):
# Determine next message ID
self._message_id = (self._message_id + 1) & 0xff
return self._message_id
def _send_button(self, button_info):
# Configure radio parameters
self._radio.set_syncword(self._config['syncword'])
# Include the remote ID unless one was supplied
if 'remote_id' not in button_info:
button_info['remote_id'] = self._id
# Get the next message ID for this remote
if 'message_id' not in button_info:
message_id = self._get_next_message_id()
|
<
<
<
|
372
373
374
375
376
377
378
379
380
381
382
383
384
385
|
def _get_next_message_id(self):
# Determine next message ID
self._message_id = (self._message_id + 1) & 0xff
return self._message_id
def _send_button(self, button_info):
# Include the remote ID unless one was supplied
if 'remote_id' not in button_info:
button_info['remote_id'] = self._id
# Get the next message ID for this remote
if 'message_id' not in button_info:
message_id = self._get_next_message_id()
|
︙ | | | ︙ | |
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
else:
delay = self._config['delay']
if 'retries' in button_info:
retries = button_info['retries']
else:
retries = self._config['retries']
self._debug("Sending {}={} n={} times with a {}s delay".format(button_info, message, retries, delay))
self._radio.multi_transmit(message, self._config['channels'], retries, delay)
return True
def _set_brightness(self, brightness, zone = None):
if zone is None:
message = {'button': 'set_brightness'}
else:
|
|
|
|
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
|
else:
delay = self._config['delay']
if 'retries' in button_info:
retries = button_info['retries']
else:
retries = self._config['retries']
self._debug("Sending {}={} n={} times with a {}s delay to queue {}".format(button_info, message, retries, delay, self._config['radio_queue']))
self._radio.multi_transmit(message, self._config['channels'], retries, delay, syncword = self._config['syncword'], submit_queue = self._config['radio_queue'])
return True
def _set_brightness(self, brightness, zone = None):
if zone is None:
message = {'button': 'set_brightness'}
else:
|
︙ | | | ︙ | |
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
|
use_max_button = True
if use_max_button:
self._debug("[INITIAL] Going to max {}".format(button_prefix))
getattr(self, "_max_{}".format(button_prefix))(zone)
else:
# Otherwise, step it
for step in range(initial_steps):
self._debug("[INITIAL] Stepping {} {}".format(button_prefix, initial_direction))
self._send_button({'button': "{}_{}".format(button_prefix, initial_direction)})
# Now that we have forced the value to the extreme, move in
# steps from that value to the desired value
if initial_value < target_value:
final_steps = target_value - initial_value
else:
final_steps = initial_value - target_value
for step in range(final_steps):
self._debug("[FINAL] Stepping {} {}".format(button_prefix, final_direction))
self._send_button({'button': "{}_{}".format(button_prefix, final_direction)})
return True
def _step_brightness(self, brightness, brightness_min, brightness_max, zone = None):
# Select the appropriate zone before sending the steps
# to ensure they reach the correct bulbs
self.on(zone, try_hard = False)
return self._step_value(brightness, brightness_min, brightness_max, 'brightness', zone)
def _step_temperature(self, temperature, temperature_min, temperature_max, zone = None):
# Select the appropriate zone before sending the steps
# to ensure they reach the correct bulbs
self.on(zone, try_hard = False)
return self._step_value(temperature, temperature_min, temperature_max, 'temperature', zone)
def _max_brightness(self, zone = None):
if zone is None:
message = {'button': 'max'}
else:
message = {
|
>
>
>
|
>
>
>
|
<
<
|
<
<
|
|
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
|
use_max_button = True
if use_max_button:
self._debug("[INITIAL] Going to max {}".format(button_prefix))
getattr(self, "_max_{}".format(button_prefix))(zone)
else:
# Otherwise, step it
step_command = {'button': "{}_{}".format(button_prefix, initial_direction)}
if zone is not None:
step_command['zone'] = zone
for step in range(initial_steps):
self._debug("[INITIAL] Stepping {} {}".format(button_prefix, initial_direction))
self._send_button(step_command)
# Now that we have forced the value to the extreme, move in
# steps from that value to the desired value
if initial_value < target_value:
final_steps = target_value - initial_value
else:
final_steps = initial_value - target_value
step_command = {'button': "{}_{}".format(button_prefix, final_direction)}
if zone is not None:
step_command['zone'] = zone
for step in range(final_steps):
self._debug("[FINAL] Stepping {} {}".format(button_prefix, final_direction))
self._send_button(step_command)
return True
def _step_brightness(self, brightness, brightness_min, brightness_max, zone = None):
self.on(zone)
return self._step_value(brightness, brightness_min, brightness_max, 'brightness', zone)
def _step_temperature(self, temperature, temperature_min, temperature_max, zone = None):
self.on(zone)
return self._step_value(temperature, temperature_min, temperature_max, 'temperature', zone)
def _max_brightness(self, zone = None):
if zone is None:
message = {'button': 'max'}
else:
message = {
|
︙ | | | ︙ | |
705
706
707
708
709
710
711
|
# If the remote has no control over the temperature this
# query gets a null response
if 'temperature_input_range' not in self._config:
return None
# Otherwise return with what we accept as temperature ranges
return self._config['temperature_input_range']
|
>
>
>
|
709
710
711
712
713
714
715
716
717
718
|
# If the remote has no control over the temperature this
# query gets a null response
if 'temperature_input_range' not in self._config:
return None
# Otherwise return with what we accept as temperature ranges
return self._config['temperature_input_range']
def get_radio(self):
return self._radio
|