Overview
Comment: | Added support for multiple queues, and ensured queue management is locked |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
089d2c0ce83867ff200cabf54928393f |
User & Date: | rkeene on 2020-05-01 01:33:30 |
Other Links: | manifest | tags |
Context
2020-05-02
| ||
15:10 | Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk | |
2020-05-01
| ||
01:33 | Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk | |
2020-04-30
| ||
17:50 | Added a software tx queue and support for setting the syncword as part of the transmit call check-in: fc5d84e9e1 user: rkeene tags: trunk | |
Changes
Modified lt8900_spi/__init__.py from [05e9f1b5b0] to [3c959f71b4].
︙ | ︙ | |||
226 227 228 229 230 231 232 | def __init__(self, spi_bus, spi_dev, config = None): spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi self._dequeue_thread = None | | | | > > | 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | def __init__(self, spi_bus, spi_dev, config = None): spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi self._dequeue_thread = None self._last_syncword = None self.configure(config, update = False) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') self._software_tx_queue = {} return None def __del__(self): self._debug('Deleting object') self._config['use_software_tx_queue'] = False self._spi.close() def _debug(self, message): if self._config is not None and 'debug_log_command' in self._config: self._config['debug_log_command'](message) return None |
︙ | ︙ | |||
411 412 413 414 415 416 417 | mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0] key_value = (value & mask) >> bit_range[0] result[key] = key_value # Return the filled in structure return result | | > > > | | > > > | | | | | | | 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 | mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0] key_value = (value & mask) >> bit_range[0] result[key] = key_value # Return the filled in structure return result def configure(self, config, update = True): if config is None: config = {} if update: self._config.update(config) else: self._config = config with self._get_mutex(): self._spi.max_speed_hz = self._config.get('frequency', 4000000) self._spi.bits_per_word = self._config.get('bits_per_word', 8) self._spi.cshigh = self._config.get('csigh', False) self._spi.no_cs = self._config.get('no_cs', False) self._spi.lsbfirst = self._config.get('lsbfirst', False) self._spi.threewire = self._config.get('threewire', False) self._spi.mode = self._config.get('mode', 1) # If using a queue, start a thread to run the queue if self._should_use_queue(): if self._dequeue_thread is None: self._dequeue_thread = threading.Thread(target = self._run_queue, daemon = True) self._software_tx_queue_mutex = threading.Lock() self._dequeue_thread.start() else: if self._dequeue_thread is not None: self._debug("Joining existing thread to wait for termination") self._dequeue_thread.join() self._dequeue_thread = None self._software_tx_queue_mutex = None return None def initialize(self): self._reset_device() self._set_defaults() if not self._check_radio(): return False return True def set_channel(self, channel): state = self.get_register_bits('radio_state') state['channel'] = channel self.put_register_bits('radio_state', state) return state def set_syncword(self, syncword, force = False, submit_queue = '__DEFAULT__'): # If queuing is being used, just store this message if submit_queue is not None and self._should_use_queue(): self._enqueue(submit_queue, syncword, None, None, post_delay = 0) return None # Do not set the syncword again if it's not needed if not force: if self._last_syncword is not None: if syncword == self._last_syncword: return None self._last_syncword = syncword packet_config = self.get_register_bits('packet_config') packet_config['syncword_len'] = len(syncword) - 1 self.put_register_bits('packet_config', packet_config) if len(syncword) == 1: |
︙ | ︙ | |||
508 509 510 511 512 513 514 | with self._get_mutex(lock): result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10) self._debug("Writing: {} = {}".format(log_message, result)) return new_message | | | | | | | 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 | with self._get_mutex(lock): result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10) self._debug("Writing: {} = {}".format(log_message, result)) return new_message def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'): # If we are using a radio transmit queue, just queue this message # (unless we are called from the dequeue procedure) if submit_queue is not None and self._should_use_queue(): if syncword is None: syncword = self._last_syncword self._enqueue(submit_queue, syncword, message, channel, post_delay = post_delay) return True sent_packet = True with self._get_mutex(lock): # Set the syncword if syncword is not None: self.set_syncword(syncword, submit_queue = None) if channel is None: state = self.get_register_bits('radio_state') channel = state['channel'] # Initialize the transmitter self.put_register_bits('radio_state', { |
︙ | ︙ | |||
565 566 567 568 569 570 571 572 573 574 575 | if radio_status['framer_status'] == 0: sent_packet = False break time.sleep(0.001) if post_delay != 0: time.sleep(post_delay) return sent_packet | > | | | > | > | > | | | | | | | | | | < | < < < < > > > > | | | > | | | < < < < | | | | | > | < < < | < > | | | 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 | if radio_status['framer_status'] == 0: sent_packet = False break time.sleep(0.001) if post_delay != 0: raise ValueError('No sleeping allowed') # XXX time.sleep(post_delay) return sent_packet def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'): delay = delay / len(channels) for channel in channels: for i in range(retries): if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue): return False return True def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0): with self._software_tx_queue_mutex: if submit_queue not in self._software_tx_queue: self._software_tx_queue[submit_queue] = [] if len(self._software_tx_queue[submit_queue]) == 0: dispatch_time = time.time() else: last_item = self._software_tx_queue[submit_queue][-1] dispatch_time = last_item['time'] + last_item['post_delay'] self._software_tx_queue[submit_queue].append({ 'syncword': syncword, 'message': message, 'channel': channel, 'time': dispatch_time, 'post_delay': post_delay }) return None def _run_queue(self): self._debug("Started run_queue process") sleep_time = 0 while True: self._debug("Sleeping for {} seconds".format(sleep_time)) time.sleep(sleep_time) with self._software_tx_queue_mutex: for queue in self._software_tx_queue: self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue]))) item_count = self._run_queue_once() self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue())) if item_count == 0: # If the queue is empty and we are no longer queuing # events, exit this function (which should be joined) if not self._should_use_queue(): self._debug("Request to stop run_queue process, exiting") return None # If there are no events, wait a bit # longer before trying again sleep_time = 0.5 continue # If there are more events to process, try again in 1ms sleep_time = 0.001 return None def _run_queue_once(self): now = time.time() to_transmit = [] remaining_items = 0 with self._software_tx_queue_mutex: for submit_queue in self._software_tx_queue: new_software_tx_queue = [] for item in self._software_tx_queue[submit_queue]: if item['time'] <= now: item['from_queue'] = submit_queue to_transmit.append(item) continue else: new_software_tx_queue.append(item) remaining_items += 1 self._software_tx_queue[submit_queue] = new_software_tx_queue default_syncword = None with self._get_mutex(): for item in to_transmit: self._debug("Transmitting item {}".format(item)) syncword = item['syncword'] message = item['message'] channel = item['channel'] if syncword is not None: default_syncword = syncword else: syncword = default_syncword if message is None or channel is None: continue for retry in range(3): if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword): break return remaining_items def start_listening(self, channel): # Initialize the receiver self.stop_listening() # Go into listening mode self.put_register_bits('radio_state', { |
︙ | ︙ |