Overview
Comment: | Substantial improvement to software queue management |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
5f845c3e3c2336922689a0b86aa9e9eb |
User & Date: | rkeene on 2020-05-02 15:10:33 |
Other Links: | manifest | tags |
Context
2020-05-02
| ||
15:12 | lt8900_spi v2.3 check-in: 049d34a933 user: rkeene tags: 2.3, trunk | |
15:10 | Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk | |
2020-05-01
| ||
01:33 | Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk | |
Changes
Modified lt8900_spi/__init__.py from [3c959f71b4] to [dd13b908ed].
︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25 | # -________________________________________________________- # Raspberry Pi import spidev import time import threading class dummy_context_mgr(): def __enter__(self): return None def __exit__(self, exc_type, exc_value, traceback): return False | > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | # | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25 | # -________________________________________________________- # Raspberry Pi import spidev import time import threading import collections class dummy_context_mgr(): def __enter__(self): return None def __exit__(self, exc_type, exc_value, traceback): return False |
︙ | ︙ | |||
228 229 230 231 232 233 234 235 236 237 238 239 | spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi self._dequeue_thread = None self._last_syncword = None self.configure(config, update = False) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') | > > > < < | > > > > > > > > > | 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi self._dequeue_thread = None self._last_syncword = None self._software_tx_queue = {} self._software_tx_queue_next_time = {} self.configure(config, update = False) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') return None def __del__(self): self._debug('Deleting object') self._config['use_software_tx_queue'] = False self._spi.close() def _debug(self, message): if 'debug_log_command' in self._config: self._config['debug_log_command'](message) return None def _error(self, message): log_command = None if 'error_log_command' in self._config: log_command = self._config['error_log_command'] elif 'debug_log_command' in self._config: log_command = self._config['debug_log_command'] log_command(message) return None def _get_mutex(self, real_mutex = True): if not real_mutex: return dummy_context_mgr() mutex = self._config.get('mutex', dummy_context_mgr()) return mutex |
︙ | ︙ | |||
510 511 512 513 514 515 516 | if include_length: new_message = new_message + [len(message)] new_message = new_message + message log_message = [] + new_message # Transfer the message with self._get_mutex(lock): | | | 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 | if include_length: new_message = new_message + [len(message)] new_message = new_message + message log_message = [] + new_message # Transfer the message with self._get_mutex(lock): result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0) self._debug("Writing: {} = {}".format(log_message, result)) return new_message def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'): # If we are using a radio transmit queue, just queue this message |
︙ | ︙ | |||
573 574 575 576 577 578 579 | if radio_status['framer_status'] == 0: sent_packet = False break time.sleep(0.001) if post_delay != 0: | < | > > | > > > > > | | > > > | < < < < < < < > | | > > | > > | > > > > | > | > > > > > > > > > | | < > > | | | > > | < | | > > | | > > > > > > > > > > > > > > > | > < | < | | 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 | if radio_status['framer_status'] == 0: sent_packet = False break time.sleep(0.001) if post_delay != 0: time.sleep(post_delay) return sent_packet def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'): if len(channels) == 0 or retries == 0: self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries)) for channel_idx in range(len(channels)): if channel_idx == (len(channels) - 1): retries -= 1 channel = channels[channel_idx] for i in range(retries): if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue): return False if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue): return False return True def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0): if not self._should_use_queue(): raise ValueError('internal error: _enqueue called with queueing disabled') with self._software_tx_queue_mutex: if submit_queue not in self._software_tx_queue: self._software_tx_queue[submit_queue] = collections.deque([]) self._software_tx_queue[submit_queue].append({ 'syncword': syncword, 'message': message, 'channel': channel, 'post_delay': post_delay }) return None def _run_queue(self): self._debug("Started run_queue process") sleep_time = 0 while True: if sleep_time != 0: self._debug("Sleeping for {} seconds".format(sleep_time)) time.sleep(sleep_time) with self._software_tx_queue_mutex: for queue in self._software_tx_queue: if len(self._software_tx_queue[queue]) != 0: self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue]))) try: [processed_items, remaining_items] = self._run_queue_once() except Exception as error_info: self._error("Failed to run queue: {}".format(error_info.args)) processed_items = 0 remaining_items = 0 self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue())) if remaining_items == 0: # If the queue is empty and we are no longer queuing # events, exit this function (which should be joined) if not self._should_use_queue(): self._debug("Request to stop run_queue process, exiting") return None # If there are no events, wait a bit # longer before trying again sleep_time = 0.5 continue if processed_items == 0: # If we processed no items, but there are items left to # process, back off slightly if sleep_time == 0: sleep_time = 0.001 else: sleep_time = min(0.5, sleep_time * 2) continue # If there are more events to process, try again sleep_time = 0 return None def _run_queue_once(self): to_transmit = [] remaining_items = 0 now = time.time() with self._software_tx_queue_mutex: for submit_queue in self._software_tx_queue: # Determine if we should run this queue yet if submit_queue not in self._software_tx_queue_next_time: self._software_tx_queue_next_time[submit_queue] = now queue_next_time = self._software_tx_queue_next_time[submit_queue] if now < queue_next_time: remaining_items += len(self._software_tx_queue[submit_queue]) continue # Record how many items to pop off this queue pop_items = 0 for item in self._software_tx_queue[submit_queue]: pop_items += 1 # If the last item we're about to transmit requires a delay, make # a note of it in the queue time and don't pull anything else # from this queue self._software_tx_queue_next_time[submit_queue] = now + item['post_delay'] if item['post_delay'] != 0: break # Pop off the items to transmit in this run into a list if pop_items != 0: self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue)) while pop_items != 0: to_transmit.append(self._software_tx_queue[submit_queue].popleft()) pop_items -= 1 remaining_items += len(self._software_tx_queue[submit_queue]) default_syncword = None self._debug("Getting ready to transmit {} items".format(len(to_transmit))) with self._get_mutex(): for item in to_transmit: self._debug("Transmitting item {}".format(item)) syncword = item['syncword'] message = item['message'] channel = item['channel'] if syncword is not None: default_syncword = syncword else: syncword = default_syncword if message is None or channel is None: continue self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0) return [len(to_transmit), remaining_items] def start_listening(self, channel): # Initialize the receiver self.stop_listening() # Go into listening mode self.put_register_bits('radio_state', { |
︙ | ︙ |