Overview
| Comment: | Substantial improvement to software queue management |
|---|---|
| Downloads: | Tarball | ZIP archive |
| Timelines: | family | ancestors | descendants | both | trunk |
| Files: | files | file ages | folders |
| SHA3-256: |
5f845c3e3c2336922689a0b86aa9e9eb |
| User & Date: | rkeene on 2020-05-02 15:10:33.161 |
| Other Links: | manifest | tags |
Context
|
2020-05-02
| ||
| 15:12 | lt8900_spi v2.3 check-in: 049d34a933 user: rkeene tags: 2.3, trunk | |
| 15:10 | Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk | |
|
2020-05-01
| ||
| 01:33 | Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk | |
Changes
Modified lt8900_spi/__init__.py
from [3c959f71b4]
to [dd13b908ed].
| ︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25 | # -________________________________________________________- # Raspberry Pi import spidev import time import threading class dummy_context_mgr(): def __enter__(self): return None def __exit__(self, exc_type, exc_value, traceback): return False | > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | # | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25 | # -________________________________________________________- # Raspberry Pi import spidev import time import threading import collections class dummy_context_mgr(): def __enter__(self): return None def __exit__(self, exc_type, exc_value, traceback): return False |
| ︙ | ︙ | |||
228 229 230 231 232 233 234 235 236 237 238 239 |
spi = spidev.SpiDev()
spi.open(spi_bus, spi_dev)
self._spi = spi
self._dequeue_thread = None
self._last_syncword = None
self.configure(config, update = False)
if len(self._register_map) != 53:
raise ValueError('Inconsistent register map!')
| > > > < < | > > > > > > > > > | 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
spi = spidev.SpiDev()
spi.open(spi_bus, spi_dev)
self._spi = spi
self._dequeue_thread = None
self._last_syncword = None
self._software_tx_queue = {}
self._software_tx_queue_next_time = {}
self.configure(config, update = False)
if len(self._register_map) != 53:
raise ValueError('Inconsistent register map!')
return None
def __del__(self):
self._debug('Deleting object')
self._config['use_software_tx_queue'] = False
self._spi.close()
def _debug(self, message):
if 'debug_log_command' in self._config:
self._config['debug_log_command'](message)
return None
def _error(self, message):
log_command = None
if 'error_log_command' in self._config:
log_command = self._config['error_log_command']
elif 'debug_log_command' in self._config:
log_command = self._config['debug_log_command']
log_command(message)
return None
def _get_mutex(self, real_mutex = True):
if not real_mutex:
return dummy_context_mgr()
mutex = self._config.get('mutex', dummy_context_mgr())
return mutex
|
| ︙ | ︙ | |||
510 511 512 513 514 515 516 | if include_length: new_message = new_message + [len(message)] new_message = new_message + message log_message = [] + new_message # Transfer the message with self._get_mutex(lock): | | | 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 |
if include_length:
new_message = new_message + [len(message)]
new_message = new_message + message
log_message = [] + new_message
# Transfer the message
with self._get_mutex(lock):
result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0)
self._debug("Writing: {} = {}".format(log_message, result))
return new_message
def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
# If we are using a radio transmit queue, just queue this message
|
| ︙ | ︙ | |||
573 574 575 576 577 578 579 | if radio_status['framer_status'] == 0: sent_packet = False break time.sleep(0.001) if post_delay != 0: | < | > > | > > > > > | | > > > | < < < < < < < > | | > > | > > | > > > > | > | > > > > > > > > > | | < > > | | | > > | < | | > > | | > > > > > > > > > > > > > > > | > < | < | | 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 |
if radio_status['framer_status'] == 0:
sent_packet = False
break
time.sleep(0.001)
if post_delay != 0:
time.sleep(post_delay)
return sent_packet
def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
if len(channels) == 0 or retries == 0:
self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries))
for channel_idx in range(len(channels)):
if channel_idx == (len(channels) - 1):
retries -= 1
channel = channels[channel_idx]
for i in range(retries):
if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue):
return False
if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
return False
return True
def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
if not self._should_use_queue():
raise ValueError('internal error: _enqueue called with queueing disabled')
with self._software_tx_queue_mutex:
if submit_queue not in self._software_tx_queue:
self._software_tx_queue[submit_queue] = collections.deque([])
self._software_tx_queue[submit_queue].append({
'syncword': syncword,
'message': message,
'channel': channel,
'post_delay': post_delay
})
return None
def _run_queue(self):
self._debug("Started run_queue process")
sleep_time = 0
while True:
if sleep_time != 0:
self._debug("Sleeping for {} seconds".format(sleep_time))
time.sleep(sleep_time)
with self._software_tx_queue_mutex:
for queue in self._software_tx_queue:
if len(self._software_tx_queue[queue]) != 0:
self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
try:
[processed_items, remaining_items] = self._run_queue_once()
except Exception as error_info:
self._error("Failed to run queue: {}".format(error_info.args))
processed_items = 0
remaining_items = 0
self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue()))
if remaining_items == 0:
# If the queue is empty and we are no longer queuing
# events, exit this function (which should be joined)
if not self._should_use_queue():
self._debug("Request to stop run_queue process, exiting")
return None
# If there are no events, wait a bit
# longer before trying again
sleep_time = 0.5
continue
if processed_items == 0:
# If we processed no items, but there are items left to
# process, back off slightly
if sleep_time == 0:
sleep_time = 0.001
else:
sleep_time = min(0.5, sleep_time * 2)
continue
# If there are more events to process, try again
sleep_time = 0
return None
def _run_queue_once(self):
to_transmit = []
remaining_items = 0
now = time.time()
with self._software_tx_queue_mutex:
for submit_queue in self._software_tx_queue:
# Determine if we should run this queue yet
if submit_queue not in self._software_tx_queue_next_time:
self._software_tx_queue_next_time[submit_queue] = now
queue_next_time = self._software_tx_queue_next_time[submit_queue]
if now < queue_next_time:
remaining_items += len(self._software_tx_queue[submit_queue])
continue
# Record how many items to pop off this queue
pop_items = 0
for item in self._software_tx_queue[submit_queue]:
pop_items += 1
# If the last item we're about to transmit requires a delay, make
# a note of it in the queue time and don't pull anything else
# from this queue
self._software_tx_queue_next_time[submit_queue] = now + item['post_delay']
if item['post_delay'] != 0:
break
# Pop off the items to transmit in this run into a list
if pop_items != 0:
self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue))
while pop_items != 0:
to_transmit.append(self._software_tx_queue[submit_queue].popleft())
pop_items -= 1
remaining_items += len(self._software_tx_queue[submit_queue])
default_syncword = None
self._debug("Getting ready to transmit {} items".format(len(to_transmit)))
with self._get_mutex():
for item in to_transmit:
self._debug("Transmitting item {}".format(item))
syncword = item['syncword']
message = item['message']
channel = item['channel']
if syncword is not None:
default_syncword = syncword
else:
syncword = default_syncword
if message is None or channel is None:
continue
self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0)
return [len(to_transmit), remaining_items]
def start_listening(self, channel):
# Initialize the receiver
self.stop_listening()
# Go into listening mode
self.put_register_bits('radio_state', {
|
| ︙ | ︙ |