Index: lt8900_spi/__init__.py ================================================================== --- lt8900_spi/__init__.py +++ lt8900_spi/__init__.py @@ -228,22 +228,24 @@ spi = spidev.SpiDev() spi.open(spi_bus, spi_dev) self._spi = spi self._dequeue_thread = None - self._syncword = None + self._last_syncword = None - self.configure(config) + self.configure(config, update = False) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') - self._channel_queue = {} + self._software_tx_queue = {} return None def __del__(self): + self._debug('Deleting object') + self._config['use_software_tx_queue'] = False self._spi.close() def _debug(self, message): if self._config is not None and 'debug_log_command' in self._config: self._config['debug_log_command'](message) @@ -413,15 +415,18 @@ result[key] = key_value # Return the filled in structure return result - def configure(self, config): + def configure(self, config, update = True): if config is None: config = {} - self._config = config + if update: + self._config.update(config) + else: + self._config = config with self._get_mutex(): self._spi.max_speed_hz = self._config.get('frequency', 4000000) self._spi.bits_per_word = self._config.get('bits_per_word', 8) self._spi.cshigh = self._config.get('csigh', False) @@ -431,16 +436,19 @@ self._spi.mode = self._config.get('mode', 1) # If using a queue, start a thread to run the queue if self._should_use_queue(): if self._dequeue_thread is None: - self._dequeue_thread = threading.Thread(target = self._run_queue) + self._dequeue_thread = threading.Thread(target = self._run_queue, daemon = True) + self._software_tx_queue_mutex = threading.Lock() self._dequeue_thread.start() else: if self._dequeue_thread is not None: + self._debug("Joining existing thread to wait for termination") self._dequeue_thread.join() self._dequeue_thread = None + self._software_tx_queue_mutex = None return None def initialize(self): self._reset_device() @@ -457,23 +465,23 @@ self.put_register_bits('radio_state', state) return state - def set_syncword(self, syncword, force = False, _queue_instead_of_xmit = True): + def set_syncword(self, syncword, force = False, submit_queue = '__DEFAULT__'): # If queuing is being used, just store this message - if _queue_instead_of_xmit and self._should_use_queue(): - self._enqueue(syncword, None, None) + if submit_queue is not None and self._should_use_queue(): + self._enqueue(submit_queue, syncword, None, None, post_delay = 0) return None # Do not set the syncword again if it's not needed if not force: - if self._syncword is not None: - if syncword == self._syncword: + if self._last_syncword is not None: + if syncword == self._last_syncword: return None - self._syncword = syncword + self._last_syncword = syncword packet_config = self.get_register_bits('packet_config') packet_config['syncword_len'] = len(syncword) - 1 self.put_register_bits('packet_config', packet_config) @@ -510,25 +518,25 @@ self._debug("Writing: {} = {}".format(log_message, result)) return new_message - def transmit(self, message, channel = None, lock = True, _queue_instead_of_xmit = True, post_delay = 0, syncword = None): + def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'): # If we are using a radio transmit queue, just queue this message # (unless we are called from the dequeue procedure) - if _queue_instead_of_xmit and self._should_use_queue(): + if submit_queue is not None and self._should_use_queue(): if syncword is None: - syncword = self._syncword - self._enqueue(syncword, message, channel, post_delay = post_delay) + syncword = self._last_syncword + self._enqueue(submit_queue, syncword, message, channel, post_delay = post_delay) return True sent_packet = True with self._get_mutex(lock): # Set the syncword if syncword is not None: - self.set_syncword(syncword, _queue_instead_of_xmit = False) + self.set_syncword(syncword, submit_queue = None) if channel is None: state = self.get_register_bits('radio_state') channel = state['channel'] @@ -567,61 +575,64 @@ sent_packet = False break time.sleep(0.001) if post_delay != 0: + raise ValueError('No sleeping allowed') # XXX time.sleep(post_delay) return sent_packet - def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None): + def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'): delay = delay / len(channels) for channel in channels: for i in range(retries): - if not self.transmit(message, channel, post_delay = delay, syncword = syncword): + if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue): return False return True - def _enqueue(self, syncword, message, channel, post_delay = 0.01): - if channel not in self._channel_queue: - self._channel_queue[channel] = [] - dispatch_time = time.time() - else: - last_item = self._channel_queue[channel][-1] - dispatch_time = last_item['time'] + last_item['post_delay'] - - self._channel_queue[channel].append({ - 'syncword': syncword, - 'message': message, - 'channel': channel, - 'time': dispatch_time, - 'post_delay': post_delay, - 'transmitted': False - }) + def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0): + with self._software_tx_queue_mutex: + if submit_queue not in self._software_tx_queue: + self._software_tx_queue[submit_queue] = [] + + if len(self._software_tx_queue[submit_queue]) == 0: + dispatch_time = time.time() + else: + last_item = self._software_tx_queue[submit_queue][-1] + dispatch_time = last_item['time'] + last_item['post_delay'] + + self._software_tx_queue[submit_queue].append({ + 'syncword': syncword, + 'message': message, + 'channel': channel, + 'time': dispatch_time, + 'post_delay': post_delay + }) return None def _run_queue(self): self._debug("Started run_queue process") - # Do not run the queue if we are using a queue - if not self._should_use_queue(): - self._debug("Ending run_queue process, not using the queue") - return None sleep_time = 0 while True: + self._debug("Sleeping for {} seconds".format(sleep_time)) time.sleep(sleep_time) + with self._software_tx_queue_mutex: + for queue in self._software_tx_queue: + self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue]))) item_count = self._run_queue_once() - self._debug("Running the queue, {} items left".format(item_count)) + self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue())) if item_count == 0: # If the queue is empty and we are no longer queuing # events, exit this function (which should be joined) if not self._should_use_queue(): self._debug("Request to stop run_queue process, exiting") - break + return None # If there are no events, wait a bit # longer before trying again sleep_time = 0.5 continue @@ -632,34 +643,29 @@ return None def _run_queue_once(self): now = time.time() to_transmit = [] - items_remaining = 0 - for channel in self._channel_queue: - found_item_in_queue = False - for item in self._channel_queue[channel]: - if item['transmitted']: - continue - - found_item_in_queue = True - if item['time'] <= now: - item['transmitted'] = True - to_transmit.append(item) - continue - - items_remaining += 1 - # If the channel has only transmitted items, clear it - # out except for the last item (which will have a - # record of when the next event should take place) - if not found_item_in_queue: - self._channel_queue[channel] = [self._channel_queue[channel][-1]] + remaining_items = 0 + with self._software_tx_queue_mutex: + for submit_queue in self._software_tx_queue: + new_software_tx_queue = [] + for item in self._software_tx_queue[submit_queue]: + if item['time'] <= now: + item['from_queue'] = submit_queue + to_transmit.append(item) + continue + else: + new_software_tx_queue.append(item) + remaining_items += 1 + self._software_tx_queue[submit_queue] = new_software_tx_queue default_syncword = None with self._get_mutex(): for item in to_transmit: + self._debug("Transmitting item {}".format(item)) syncword = item['syncword'] message = item['message'] channel = item['channel'] if syncword is not None: @@ -669,14 +675,14 @@ if message is None or channel is None: continue for retry in range(3): - if self.transmit(message, channel, lock = False, _queue_instead_of_xmit = False, syncword = syncword): + if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword): break - return items_remaining + return remaining_items def start_listening(self, channel): # Initialize the receiver self.stop_listening()