Index: lt8900_spi/__init__.py ================================================================== --- lt8900_spi/__init__.py +++ lt8900_spi/__init__.py @@ -19,10 +19,17 @@ # -________________________________________________________- # Raspberry Pi import spidev import time +import threading + +class dummy_context_mgr(): + def __enter__(self): + return None + def __exit__(self, exc_type, exc_value, traceback): + return False class Radio: _register_map = [ {'name': "Unknown"}, # 0 {'name': "Unknown"}, # 1 @@ -219,15 +226,20 @@ def __init__(self, spi_bus, spi_dev, config = None): spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi + + self._dequeue_thread = None + self._syncword = None self.configure(config) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') + + self._channel_queue = {} return None def __del__(self): self._spi.close() @@ -234,16 +246,33 @@ def _debug(self, message): if self._config is not None and 'debug_log_command' in self._config: self._config['debug_log_command'](message) return None + + def _get_mutex(self, real_mutex = True): + if not real_mutex: + return dummy_context_mgr() + + mutex = self._config.get('mutex', dummy_context_mgr()) + return mutex def _reset_device(self): - if self._config is not None: - if self._config['reset_command'] is not None: - return self._config['reset_command']() + reset_command = self._config.get('reset_command', None) + + if reset_command is None: + return None + + reset_command() + return None + + def _should_use_queue(self): + if 'use_software_tx_queue' in self._config: + return self._config['use_software_tx_queue'] + + return False def _register_name(self, reg_number): return self._register_map[reg_number]['name'] def _register_number(self, reg_string): @@ -308,10 +337,11 @@ return True def _put_register_high_low(self, reg, high, low, delay = 7): reg = self._register_number(reg) + result = self._spi.xfer([reg, high, low], self._spi.max_speed_hz, delay) if reg & 0x80 == 0x80: self._debug(" regRead[%02X] = %s" % ((reg & 0x7f), result)) else: @@ -384,22 +414,33 @@ # Return the filled in structure return result def configure(self, config): + if config is None: + config = {} + self._config = config - if config is None: - return None - - self._spi.max_speed_hz = self._config.get('frequency', 4000000) - self._spi.bits_per_word = self._config.get('bits_per_word', 8) - self._spi.cshigh = self._config.get('csigh', False) - self._spi.no_cs = self._config.get('no_cs', False) - self._spi.lsbfirst = self._config.get('lsbfirst', False) - self._spi.threewire = self._config.get('threewire', False) - self._spi.mode = self._config.get('mode', 1) + with self._get_mutex(): + self._spi.max_speed_hz = self._config.get('frequency', 4000000) + self._spi.bits_per_word = self._config.get('bits_per_word', 8) + self._spi.cshigh = self._config.get('csigh', False) + self._spi.no_cs = self._config.get('no_cs', False) + self._spi.lsbfirst = self._config.get('lsbfirst', False) + self._spi.threewire = self._config.get('threewire', False) + self._spi.mode = self._config.get('mode', 1) + + # If using a queue, start a thread to run the queue + if self._should_use_queue(): + if self._dequeue_thread is None: + self._dequeue_thread = threading.Thread(target = self._run_queue) + self._dequeue_thread.start() + else: + if self._dequeue_thread is not None: + self._dequeue_thread.join() + self._dequeue_thread = None return None def initialize(self): self._reset_device() @@ -416,11 +457,24 @@ self.put_register_bits('radio_state', state) return state - def set_syncword(self, syncword): + def set_syncword(self, syncword, force = False, _queue_instead_of_xmit = True): + # If queuing is being used, just store this message + if _queue_instead_of_xmit and self._should_use_queue(): + self._enqueue(syncword, None, None) + return None + + # Do not set the syncword again if it's not needed + if not force: + if self._syncword is not None: + if syncword == self._syncword: + return None + + self._syncword = syncword + packet_config = self.get_register_bits('packet_config') packet_config['syncword_len'] = len(syncword) - 1 self.put_register_bits('packet_config', packet_config) @@ -441,71 +495,189 @@ elif len(syncword) > 4: raise ValueError("SyncWord length must be less than 5") return None - def fill_fifo(self, message, include_length = True): + def fill_fifo(self, message, include_length = True, lock = True): new_message = [self._register_number('fifo')] if include_length: new_message = new_message + [len(message)] new_message = new_message + message log_message = [] + new_message # Transfer the message - result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10) + with self._get_mutex(lock): + result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10) + self._debug("Writing: {} = {}".format(log_message, result)) return new_message - def transmit(self, message, channel = None): - if channel is None: - state = self.get_register_bits('radio_state') - channel = state['channel'] - - # Initialize the transmitter - self.put_register_bits('radio_state', { - 'tx_enabled': 0, - 'rx_enabled': 0, - 'channel': 0 - }) - - self.put_register_bits('fifo_state', { - 'clear_read': 1, - 'clear_write': 1 - }) - - # Format message to send to fifo - self.fill_fifo(message, True) - - # Tell the radio to transmit the FIFO buffer to the specified channel - self.put_register_bits('radio_state', { - 'tx_enabled': 1, - 'rx_enabled': 0, - 'channel': channel - }) - - # Wait for buffer to empty - # XXX: Untested - while True: - radio_status = self.get_register_bits('status') - self._debug("radio_status={}".format(radio_status)) - - if radio_status['packet_flag'] == 1: - break - time.sleep(0.001) - - return True - - def multi_transmit(self, message, channels, retries = 3, delay = 0.1): - for channel in channels: - for i in range(retries): - if not self.transmit(message, channel): - return False - if delay != 0: - time.sleep(delay / retries) - return True - + def transmit(self, message, channel = None, lock = True, _queue_instead_of_xmit = True, post_delay = 0, syncword = None): + # If we are using a radio transmit queue, just queue this message + # (unless we are called from the dequeue procedure) + if _queue_instead_of_xmit and self._should_use_queue(): + if syncword is None: + syncword = self._syncword + self._enqueue(syncword, message, channel, post_delay = post_delay) + return True + + sent_packet = True + + with self._get_mutex(lock): + # Set the syncword + if syncword is not None: + self.set_syncword(syncword, _queue_instead_of_xmit = False) + + if channel is None: + state = self.get_register_bits('radio_state') + channel = state['channel'] + + # Initialize the transmitter + self.put_register_bits('radio_state', { + 'tx_enabled': 0, + 'rx_enabled': 0, + 'channel': 0 + }) + + self.put_register_bits('fifo_state', { + 'clear_read': 1, + 'clear_write': 1 + }) + + # Format message to send to fifo + self.fill_fifo(message, include_length = True, lock = False) + + # Tell the radio to transmit the FIFO buffer to the specified channel + self.put_register_bits('radio_state', { + 'tx_enabled': 1, + 'rx_enabled': 0, + 'channel': channel + }) + + # Wait for buffer to empty + # XXX: Untested + while True: + radio_status = self.get_register_bits('status') + self._debug("radio_status={}".format(radio_status)) + + if radio_status['packet_flag'] == 1: + break + + if radio_status['framer_status'] == 0: + sent_packet = False + break + time.sleep(0.001) + + if post_delay != 0: + time.sleep(post_delay) + + return sent_packet + + def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None): + delay = delay / len(channels) + for channel in channels: + for i in range(retries): + if not self.transmit(message, channel, post_delay = delay, syncword = syncword): + return False + + return True + + def _enqueue(self, syncword, message, channel, post_delay = 0.01): + if channel not in self._channel_queue: + self._channel_queue[channel] = [] + dispatch_time = time.time() + else: + last_item = self._channel_queue[channel][-1] + dispatch_time = last_item['time'] + last_item['post_delay'] + + self._channel_queue[channel].append({ + 'syncword': syncword, + 'message': message, + 'channel': channel, + 'time': dispatch_time, + 'post_delay': post_delay, + 'transmitted': False + }) + + return None + + def _run_queue(self): + self._debug("Started run_queue process") + # Do not run the queue if we are using a queue + if not self._should_use_queue(): + self._debug("Ending run_queue process, not using the queue") + return None + + sleep_time = 0 + while True: + time.sleep(sleep_time) + item_count = self._run_queue_once() + self._debug("Running the queue, {} items left".format(item_count)) + + if item_count == 0: + # If the queue is empty and we are no longer queuing + # events, exit this function (which should be joined) + if not self._should_use_queue(): + self._debug("Request to stop run_queue process, exiting") + break + + # If there are no events, wait a bit + # longer before trying again + sleep_time = 0.5 + continue + + # If there are more events to process, try again in 1ms + sleep_time = 0.001 + + return None + + def _run_queue_once(self): + now = time.time() + to_transmit = [] + items_remaining = 0 + for channel in self._channel_queue: + found_item_in_queue = False + for item in self._channel_queue[channel]: + if item['transmitted']: + continue + + found_item_in_queue = True + if item['time'] <= now: + item['transmitted'] = True + to_transmit.append(item) + continue + + items_remaining += 1 + # If the channel has only transmitted items, clear it + # out except for the last item (which will have a + # record of when the next event should take place) + if not found_item_in_queue: + self._channel_queue[channel] = [self._channel_queue[channel][-1]] + + default_syncword = None + + with self._get_mutex(): + for item in to_transmit: + syncword = item['syncword'] + message = item['message'] + channel = item['channel'] + + if syncword is not None: + default_syncword = syncword + else: + syncword = default_syncword + + if message is None or channel is None: + continue + + for retry in range(3): + if self.transmit(message, channel, lock = False, _queue_instead_of_xmit = False, syncword = syncword): + break + + return items_remaining + def start_listening(self, channel): # Initialize the receiver self.stop_listening() # Go into listening mode @@ -531,58 +703,60 @@ }) return True def receive(self, channel = None, wait = False, length = None, wait_time = 0.1): - if wait: - if channel is None: - state = self.get_register_bits('radio_state') - channel = state['channel'] - - self.start_listening(channel) - - message = [] - - while True: - radio_status = self.get_register_bits('status') - self._debug("radio_status={}".format(radio_status)) - if radio_status['packet_flag'] == 0: - if wait: - time.sleep(wait_time) - continue - else: - return None - - if radio_status['crc_error'] == 1: - # Handle invalid packet ? - self.start_listening(channel) - continue - - # Data is available, read it from the FIFO register - # The first result will include the length - # XXX *IF* length encoding is enabled ? - fifo_data = self.get_register('fifo') - message_length = fifo_data >> 8 - - if message_length == 0: - self.start_listening(channel) - continue - - # Keep track of the total message length to truncate it - final_message_length = message_length - - message += [fifo_data & 0xff] - message_length -= 1 - - # Read subsequent bytes from the FIFO register until - # there are no more bytes to read - while message_length > 0: - fifo_data = self.get_register('fifo') - message += [fifo_data >> 8, fifo_data & 0xff] - message_length -= 2 - - # Truncate the message to its final size, since we have - # to read in 16-bit words, we may have an extra byte - message = message[0:final_message_length] - break + with self._get_mutex(): + if wait: + if channel is None: + state = self.get_register_bits('radio_state') + channel = state['channel'] + + self.start_listening(channel) + + message = [] + + while True: + radio_status = self.get_register_bits('status') + self._debug("radio_status={}".format(radio_status)) + if radio_status['packet_flag'] == 0: + if wait: + time.sleep(wait_time) + continue + else: + self._unlock_radio() + return None + + if radio_status['crc_error'] == 1: + # Handle invalid packet ? + self.start_listening(channel) + continue + + # Data is available, read it from the FIFO register + # The first result will include the length + # XXX *IF* length encoding is enabled ? + fifo_data = self.get_register('fifo') + message_length = fifo_data >> 8 + + if message_length == 0: + self.start_listening(channel) + continue + + # Keep track of the total message length to truncate it + final_message_length = message_length + + message += [fifo_data & 0xff] + message_length -= 1 + + # Read subsequent bytes from the FIFO register until + # there are no more bytes to read + while message_length > 0: + fifo_data = self.get_register('fifo') + message += [fifo_data >> 8, fifo_data & 0xff] + message_length -= 2 + + # Truncate the message to its final size, since we have + # to read in 16-bit words, we may have an extra byte + message = message[0:final_message_length] + break return message