228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
|
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
|
spi = spidev.SpiDev()
spi.open(spi_bus, spi_dev)
self._spi = spi
self._dequeue_thread = None
self._last_syncword = None
self._software_tx_queue = {}
self._software_tx_queue_next_time = {}
self.configure(config, update = False)
if len(self._register_map) != 53:
raise ValueError('Inconsistent register map!')
self._software_tx_queue = {}
return None
def __del__(self):
self._debug('Deleting object')
self._config['use_software_tx_queue'] = False
self._spi.close()
def _debug(self, message):
if self._config is not None and 'debug_log_command' in self._config:
if 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
def _error(self, message):
log_command = None
if 'error_log_command' in self._config:
log_command = self._config['error_log_command']
elif 'debug_log_command' in self._config:
log_command = self._config['debug_log_command']
log_command(message)
return None
def _get_mutex(self, real_mutex = True):
if not real_mutex:
return dummy_context_mgr()
mutex = self._config.get('mutex', dummy_context_mgr())
return mutex
|
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
|
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
|
-
+
|
if include_length:
new_message = new_message + [len(message)]
new_message = new_message + message
log_message = [] + new_message
# Transfer the message
with self._get_mutex(lock):
result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)
result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0)
self._debug("Writing: {} = {}".format(log_message, result))
return new_message
def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
# If we are using a radio transmit queue, just queue this message
|
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
|
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
|
-
-
-
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
-
+
-
-
-
-
-
-
-
+
-
-
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
+
+
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
-
-
+
|
if radio_status['framer_status'] == 0:
sent_packet = False
break
time.sleep(0.001)
if post_delay != 0:
raise ValueError('No sleeping allowed') # XXX
time.sleep(post_delay)
return sent_packet
def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
delay = delay / len(channels)
for channel in channels:
if len(channels) == 0 or retries == 0:
self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries))
for channel_idx in range(len(channels)):
if channel_idx == (len(channels) - 1):
retries -= 1
channel = channels[channel_idx]
for i in range(retries):
if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue):
return False
if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
return False
if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
return False
return True
def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
if not self._should_use_queue():
raise ValueError('internal error: _enqueue called with queueing disabled')
with self._software_tx_queue_mutex:
if submit_queue not in self._software_tx_queue:
self._software_tx_queue[submit_queue] = []
self._software_tx_queue[submit_queue] = collections.deque([])
if len(self._software_tx_queue[submit_queue]) == 0:
dispatch_time = time.time()
else:
last_item = self._software_tx_queue[submit_queue][-1]
dispatch_time = last_item['time'] + last_item['post_delay']
self._software_tx_queue[submit_queue].append({
'syncword': syncword,
'message': message,
'channel': channel,
'time': dispatch_time,
'post_delay': post_delay
})
return None
def _run_queue(self):
self._debug("Started run_queue process")
sleep_time = 0
while True:
if sleep_time != 0:
self._debug("Sleeping for {} seconds".format(sleep_time))
time.sleep(sleep_time)
self._debug("Sleeping for {} seconds".format(sleep_time))
time.sleep(sleep_time)
with self._software_tx_queue_mutex:
for queue in self._software_tx_queue:
if len(self._software_tx_queue[queue]) != 0:
self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
item_count = self._run_queue_once()
self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue()))
self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
try:
[processed_items, remaining_items] = self._run_queue_once()
except Exception as error_info:
self._error("Failed to run queue: {}".format(error_info.args))
processed_items = 0
remaining_items = 0
self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue()))
if item_count == 0:
if remaining_items == 0:
# If the queue is empty and we are no longer queuing
# events, exit this function (which should be joined)
if not self._should_use_queue():
self._debug("Request to stop run_queue process, exiting")
return None
# If there are no events, wait a bit
# longer before trying again
sleep_time = 0.5
continue
if processed_items == 0:
# If we processed no items, but there are items left to
# process, back off slightly
if sleep_time == 0:
sleep_time = 0.001
else:
sleep_time = min(0.5, sleep_time * 2)
continue
# If there are more events to process, try again in 1ms
sleep_time = 0.001
# If there are more events to process, try again
sleep_time = 0
return None
def _run_queue_once(self):
now = time.time()
to_transmit = []
remaining_items = 0
now = time.time()
with self._software_tx_queue_mutex:
for submit_queue in self._software_tx_queue:
# Determine if we should run this queue yet
new_software_tx_queue = []
for item in self._software_tx_queue[submit_queue]:
if item['time'] <= now:
item['from_queue'] = submit_queue
if submit_queue not in self._software_tx_queue_next_time:
self._software_tx_queue_next_time[submit_queue] = now
queue_next_time = self._software_tx_queue_next_time[submit_queue]
if now < queue_next_time:
remaining_items += len(self._software_tx_queue[submit_queue])
to_transmit.append(item)
continue
else:
new_software_tx_queue.append(item)
remaining_items += 1
self._software_tx_queue[submit_queue] = new_software_tx_queue
continue
# Record how many items to pop off this queue
pop_items = 0
for item in self._software_tx_queue[submit_queue]:
pop_items += 1
# If the last item we're about to transmit requires a delay, make
# a note of it in the queue time and don't pull anything else
# from this queue
self._software_tx_queue_next_time[submit_queue] = now + item['post_delay']
if item['post_delay'] != 0:
break
# Pop off the items to transmit in this run into a list
if pop_items != 0:
self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue))
while pop_items != 0:
to_transmit.append(self._software_tx_queue[submit_queue].popleft())
pop_items -= 1
remaining_items += len(self._software_tx_queue[submit_queue])
default_syncword = None
self._debug("Getting ready to transmit {} items".format(len(to_transmit)))
with self._get_mutex():
for item in to_transmit:
self._debug("Transmitting item {}".format(item))
syncword = item['syncword']
message = item['message']
channel = item['channel']
if syncword is not None:
default_syncword = syncword
else:
syncword = default_syncword
if message is None or channel is None:
continue
for retry in range(3):
if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword):
self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0)
break
return remaining_items
return [len(to_transmit), remaining_items]
def start_listening(self, channel):
# Initialize the receiver
self.stop_listening()
# Go into listening mode
self.put_register_bits('radio_state', {
|