226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
def __init__(self, spi_bus, spi_dev, config = None):
spi = spidev.SpiDev()
spi.open(spi_bus, spi_dev)
self._spi = spi
self._dequeue_thread = None
self._syncword = None
self.configure(config)
if len(self._register_map) != 53:
raise ValueError('Inconsistent register map!')
self._channel_queue = {}
return None
def __del__(self):
self._spi.close()
def _debug(self, message):
if self._config is not None and 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
|
|
|
|
>
>
|
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
def __init__(self, spi_bus, spi_dev, config = None):
spi = spidev.SpiDev()
spi.open(spi_bus, spi_dev)
self._spi = spi
self._dequeue_thread = None
self._last_syncword = None
self.configure(config, update = False)
if len(self._register_map) != 53:
raise ValueError('Inconsistent register map!')
self._software_tx_queue = {}
return None
def __del__(self):
self._debug('Deleting object')
self._config['use_software_tx_queue'] = False
self._spi.close()
def _debug(self, message):
if self._config is not None and 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
|
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
|
mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0]
key_value = (value & mask) >> bit_range[0]
result[key] = key_value
# Return the filled in structure
return result
def configure(self, config):
if config is None:
config = {}
self._config = config
with self._get_mutex():
self._spi.max_speed_hz = self._config.get('frequency', 4000000)
self._spi.bits_per_word = self._config.get('bits_per_word', 8)
self._spi.cshigh = self._config.get('csigh', False)
self._spi.no_cs = self._config.get('no_cs', False)
self._spi.lsbfirst = self._config.get('lsbfirst', False)
self._spi.threewire = self._config.get('threewire', False)
self._spi.mode = self._config.get('mode', 1)
# If using a queue, start a thread to run the queue
if self._should_use_queue():
if self._dequeue_thread is None:
self._dequeue_thread = threading.Thread(target = self._run_queue)
self._dequeue_thread.start()
else:
if self._dequeue_thread is not None:
self._dequeue_thread.join()
self._dequeue_thread = None
return None
def initialize(self):
self._reset_device()
self._set_defaults()
if not self._check_radio():
return False
return True
def set_channel(self, channel):
state = self.get_register_bits('radio_state')
state['channel'] = channel
self.put_register_bits('radio_state', state)
return state
def set_syncword(self, syncword, force = False, _queue_instead_of_xmit = True):
# If queuing is being used, just store this message
if _queue_instead_of_xmit and self._should_use_queue():
self._enqueue(syncword, None, None)
return None
# Do not set the syncword again if it's not needed
if not force:
if self._syncword is not None:
if syncword == self._syncword:
return None
self._syncword = syncword
packet_config = self.get_register_bits('packet_config')
packet_config['syncword_len'] = len(syncword) - 1
self.put_register_bits('packet_config', packet_config)
if len(syncword) == 1:
|
|
>
>
>
|
|
>
>
>
|
|
|
|
|
|
|
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
|
mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0]
key_value = (value & mask) >> bit_range[0]
result[key] = key_value
# Return the filled in structure
return result
def configure(self, config, update = True):
if config is None:
config = {}
if update:
self._config.update(config)
else:
self._config = config
with self._get_mutex():
self._spi.max_speed_hz = self._config.get('frequency', 4000000)
self._spi.bits_per_word = self._config.get('bits_per_word', 8)
self._spi.cshigh = self._config.get('csigh', False)
self._spi.no_cs = self._config.get('no_cs', False)
self._spi.lsbfirst = self._config.get('lsbfirst', False)
self._spi.threewire = self._config.get('threewire', False)
self._spi.mode = self._config.get('mode', 1)
# If using a queue, start a thread to run the queue
if self._should_use_queue():
if self._dequeue_thread is None:
self._dequeue_thread = threading.Thread(target = self._run_queue, daemon = True)
self._software_tx_queue_mutex = threading.Lock()
self._dequeue_thread.start()
else:
if self._dequeue_thread is not None:
self._debug("Joining existing thread to wait for termination")
self._dequeue_thread.join()
self._dequeue_thread = None
self._software_tx_queue_mutex = None
return None
def initialize(self):
self._reset_device()
self._set_defaults()
if not self._check_radio():
return False
return True
def set_channel(self, channel):
state = self.get_register_bits('radio_state')
state['channel'] = channel
self.put_register_bits('radio_state', state)
return state
def set_syncword(self, syncword, force = False, submit_queue = '__DEFAULT__'):
# If queuing is being used, just store this message
if submit_queue is not None and self._should_use_queue():
self._enqueue(submit_queue, syncword, None, None, post_delay = 0)
return None
# Do not set the syncword again if it's not needed
if not force:
if self._last_syncword is not None:
if syncword == self._last_syncword:
return None
self._last_syncword = syncword
packet_config = self.get_register_bits('packet_config')
packet_config['syncword_len'] = len(syncword) - 1
self.put_register_bits('packet_config', packet_config)
if len(syncword) == 1:
|
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
|
with self._get_mutex(lock):
result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)
self._debug("Writing: {} = {}".format(log_message, result))
return new_message
def transmit(self, message, channel = None, lock = True, _queue_instead_of_xmit = True, post_delay = 0, syncword = None):
# If we are using a radio transmit queue, just queue this message
# (unless we are called from the dequeue procedure)
if _queue_instead_of_xmit and self._should_use_queue():
if syncword is None:
syncword = self._syncword
self._enqueue(syncword, message, channel, post_delay = post_delay)
return True
sent_packet = True
with self._get_mutex(lock):
# Set the syncword
if syncword is not None:
self.set_syncword(syncword, _queue_instead_of_xmit = False)
if channel is None:
state = self.get_register_bits('radio_state')
channel = state['channel']
# Initialize the transmitter
self.put_register_bits('radio_state', {
|
|
|
|
|
|
|
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
|
with self._get_mutex(lock):
result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)
self._debug("Writing: {} = {}".format(log_message, result))
return new_message
def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
# If we are using a radio transmit queue, just queue this message
# (unless we are called from the dequeue procedure)
if submit_queue is not None and self._should_use_queue():
if syncword is None:
syncword = self._last_syncword
self._enqueue(submit_queue, syncword, message, channel, post_delay = post_delay)
return True
sent_packet = True
with self._get_mutex(lock):
# Set the syncword
if syncword is not None:
self.set_syncword(syncword, submit_queue = None)
if channel is None:
state = self.get_register_bits('radio_state')
channel = state['channel']
# Initialize the transmitter
self.put_register_bits('radio_state', {
|
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
|
if radio_status['framer_status'] == 0:
sent_packet = False
break
time.sleep(0.001)
if post_delay != 0:
time.sleep(post_delay)
return sent_packet
def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None):
delay = delay / len(channels)
for channel in channels:
for i in range(retries):
if not self.transmit(message, channel, post_delay = delay, syncword = syncword):
return False
return True
def _enqueue(self, syncword, message, channel, post_delay = 0.01):
if channel not in self._channel_queue:
self._channel_queue[channel] = []
dispatch_time = time.time()
else:
last_item = self._channel_queue[channel][-1]
dispatch_time = last_item['time'] + last_item['post_delay']
self._channel_queue[channel].append({
'syncword': syncword,
'message': message,
'channel': channel,
'time': dispatch_time,
'post_delay': post_delay,
'transmitted': False
})
return None
def _run_queue(self):
self._debug("Started run_queue process")
# Do not run the queue if we are using a queue
if not self._should_use_queue():
self._debug("Ending run_queue process, not using the queue")
return None
sleep_time = 0
while True:
time.sleep(sleep_time)
item_count = self._run_queue_once()
self._debug("Running the queue, {} items left".format(item_count))
if item_count == 0:
# If the queue is empty and we are no longer queuing
# events, exit this function (which should be joined)
if not self._should_use_queue():
self._debug("Request to stop run_queue process, exiting")
break
# If there are no events, wait a bit
# longer before trying again
sleep_time = 0.5
continue
# If there are more events to process, try again in 1ms
sleep_time = 0.001
return None
def _run_queue_once(self):
now = time.time()
to_transmit = []
items_remaining = 0
for channel in self._channel_queue:
found_item_in_queue = False
for item in self._channel_queue[channel]:
if item['transmitted']:
continue
found_item_in_queue = True
if item['time'] <= now:
item['transmitted'] = True
to_transmit.append(item)
continue
items_remaining += 1
# If the channel has only transmitted items, clear it
# out except for the last item (which will have a
# record of when the next event should take place)
if not found_item_in_queue:
self._channel_queue[channel] = [self._channel_queue[channel][-1]]
default_syncword = None
with self._get_mutex():
for item in to_transmit:
syncword = item['syncword']
message = item['message']
channel = item['channel']
if syncword is not None:
default_syncword = syncword
else:
syncword = default_syncword
if message is None or channel is None:
continue
for retry in range(3):
if self.transmit(message, channel, lock = False, _queue_instead_of_xmit = False, syncword = syncword):
break
return items_remaining
def start_listening(self, channel):
# Initialize the receiver
self.stop_listening()
# Go into listening mode
self.put_register_bits('radio_state', {
|
>
|
|
|
>
|
>
|
>
|
|
|
|
|
|
|
|
|
|
<
|
<
<
<
<
>
>
>
>
|
|
|
>
|
|
|
<
<
<
<
|
|
|
|
|
>
|
<
<
<
|
<
>
|
|
|
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
|
if radio_status['framer_status'] == 0:
sent_packet = False
break
time.sleep(0.001)
if post_delay != 0:
raise ValueError('No sleeping allowed') # XXX
time.sleep(post_delay)
return sent_packet
def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
delay = delay / len(channels)
for channel in channels:
for i in range(retries):
if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
return False
return True
def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
with self._software_tx_queue_mutex:
if submit_queue not in self._software_tx_queue:
self._software_tx_queue[submit_queue] = []
if len(self._software_tx_queue[submit_queue]) == 0:
dispatch_time = time.time()
else:
last_item = self._software_tx_queue[submit_queue][-1]
dispatch_time = last_item['time'] + last_item['post_delay']
self._software_tx_queue[submit_queue].append({
'syncword': syncword,
'message': message,
'channel': channel,
'time': dispatch_time,
'post_delay': post_delay
})
return None
def _run_queue(self):
self._debug("Started run_queue process")
sleep_time = 0
while True:
self._debug("Sleeping for {} seconds".format(sleep_time))
time.sleep(sleep_time)
with self._software_tx_queue_mutex:
for queue in self._software_tx_queue:
self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
item_count = self._run_queue_once()
self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue()))
if item_count == 0:
# If the queue is empty and we are no longer queuing
# events, exit this function (which should be joined)
if not self._should_use_queue():
self._debug("Request to stop run_queue process, exiting")
return None
# If there are no events, wait a bit
# longer before trying again
sleep_time = 0.5
continue
# If there are more events to process, try again in 1ms
sleep_time = 0.001
return None
def _run_queue_once(self):
now = time.time()
to_transmit = []
remaining_items = 0
with self._software_tx_queue_mutex:
for submit_queue in self._software_tx_queue:
new_software_tx_queue = []
for item in self._software_tx_queue[submit_queue]:
if item['time'] <= now:
item['from_queue'] = submit_queue
to_transmit.append(item)
continue
else:
new_software_tx_queue.append(item)
remaining_items += 1
self._software_tx_queue[submit_queue] = new_software_tx_queue
default_syncword = None
with self._get_mutex():
for item in to_transmit:
self._debug("Transmitting item {}".format(item))
syncword = item['syncword']
message = item['message']
channel = item['channel']
if syncword is not None:
default_syncword = syncword
else:
syncword = default_syncword
if message is None or channel is None:
continue
for retry in range(3):
if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword):
break
return remaining_items
def start_listening(self, channel):
# Initialize the receiver
self.stop_listening()
# Go into listening mode
self.put_register_bits('radio_state', {
|