Index: lt8900_spi/__init__.py ================================================================== --- lt8900_spi/__init__.py +++ lt8900_spi/__init__.py @@ -20,10 +20,11 @@ # Raspberry Pi import spidev import time import threading +import collections class dummy_context_mgr(): def __enter__(self): return None def __exit__(self, exc_type, exc_value, traceback): @@ -230,28 +231,38 @@ self._spi = spi self._dequeue_thread = None self._last_syncword = None + self._software_tx_queue = {} + self._software_tx_queue_next_time = {} + self.configure(config, update = False) if len(self._register_map) != 53: raise ValueError('Inconsistent register map!') - self._software_tx_queue = {} - return None def __del__(self): self._debug('Deleting object') self._config['use_software_tx_queue'] = False self._spi.close() def _debug(self, message): - if self._config is not None and 'debug_log_command' in self._config: + if 'debug_log_command' in self._config: self._config['debug_log_command'](message) return None + + def _error(self, message): + log_command = None + if 'error_log_command' in self._config: + log_command = self._config['error_log_command'] + elif 'debug_log_command' in self._config: + log_command = self._config['debug_log_command'] + log_command(message) + return None def _get_mutex(self, real_mutex = True): if not real_mutex: return dummy_context_mgr() @@ -512,11 +523,11 @@ new_message = new_message + message log_message = [] + new_message # Transfer the message with self._get_mutex(lock): - result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10) + result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0) self._debug("Writing: {} = {}".format(log_message, result)) return new_message @@ -575,40 +586,42 @@ sent_packet = False break time.sleep(0.001) if post_delay != 0: - raise ValueError('No sleeping allowed') # XXX time.sleep(post_delay) return sent_packet def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'): - delay = delay / len(channels) - for channel in channels: + if len(channels) == 0 or retries == 0: + self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries)) + + for channel_idx in range(len(channels)): + if channel_idx == (len(channels) - 1): + retries -= 1 + channel = channels[channel_idx] for i in range(retries): - if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue): + if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue): return False + if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue): + return False return True def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0): + if not self._should_use_queue(): + raise ValueError('internal error: _enqueue called with queueing disabled') + with self._software_tx_queue_mutex: if submit_queue not in self._software_tx_queue: - self._software_tx_queue[submit_queue] = [] - - if len(self._software_tx_queue[submit_queue]) == 0: - dispatch_time = time.time() - else: - last_item = self._software_tx_queue[submit_queue][-1] - dispatch_time = last_item['time'] + last_item['post_delay'] + self._software_tx_queue[submit_queue] = collections.deque([]) self._software_tx_queue[submit_queue].append({ 'syncword': syncword, 'message': message, 'channel': channel, - 'time': dispatch_time, 'post_delay': post_delay }) return None @@ -615,19 +628,29 @@ def _run_queue(self): self._debug("Started run_queue process") sleep_time = 0 while True: - self._debug("Sleeping for {} seconds".format(sleep_time)) - time.sleep(sleep_time) + if sleep_time != 0: + self._debug("Sleeping for {} seconds".format(sleep_time)) + time.sleep(sleep_time) + with self._software_tx_queue_mutex: for queue in self._software_tx_queue: - self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue]))) - item_count = self._run_queue_once() - self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue())) + if len(self._software_tx_queue[queue]) != 0: + self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue]))) + + try: + [processed_items, remaining_items] = self._run_queue_once() + except Exception as error_info: + self._error("Failed to run queue: {}".format(error_info.args)) + processed_items = 0 + remaining_items = 0 + + self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue())) - if item_count == 0: + if remaining_items == 0: # If the queue is empty and we are no longer queuing # events, exit this function (which should be joined) if not self._should_use_queue(): self._debug("Request to stop run_queue process, exiting") return None @@ -635,34 +658,63 @@ # If there are no events, wait a bit # longer before trying again sleep_time = 0.5 continue - # If there are more events to process, try again in 1ms - sleep_time = 0.001 + if processed_items == 0: + # If we processed no items, but there are items left to + # process, back off slightly + if sleep_time == 0: + sleep_time = 0.001 + else: + sleep_time = min(0.5, sleep_time * 2) + continue + + # If there are more events to process, try again + sleep_time = 0 return None def _run_queue_once(self): - now = time.time() to_transmit = [] remaining_items = 0 + now = time.time() with self._software_tx_queue_mutex: for submit_queue in self._software_tx_queue: - new_software_tx_queue = [] + # Determine if we should run this queue yet + if submit_queue not in self._software_tx_queue_next_time: + self._software_tx_queue_next_time[submit_queue] = now + + queue_next_time = self._software_tx_queue_next_time[submit_queue] + if now < queue_next_time: + remaining_items += len(self._software_tx_queue[submit_queue]) + continue + + # Record how many items to pop off this queue + pop_items = 0 for item in self._software_tx_queue[submit_queue]: - if item['time'] <= now: - item['from_queue'] = submit_queue - to_transmit.append(item) - continue - else: - new_software_tx_queue.append(item) - remaining_items += 1 - self._software_tx_queue[submit_queue] = new_software_tx_queue + pop_items += 1 + + # If the last item we're about to transmit requires a delay, make + # a note of it in the queue time and don't pull anything else + # from this queue + self._software_tx_queue_next_time[submit_queue] = now + item['post_delay'] + if item['post_delay'] != 0: + break + + # Pop off the items to transmit in this run into a list + if pop_items != 0: + self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue)) + while pop_items != 0: + to_transmit.append(self._software_tx_queue[submit_queue].popleft()) + pop_items -= 1 + + remaining_items += len(self._software_tx_queue[submit_queue]) default_syncword = None + self._debug("Getting ready to transmit {} items".format(len(to_transmit))) with self._get_mutex(): for item in to_transmit: self._debug("Transmitting item {}".format(item)) syncword = item['syncword'] message = item['message'] @@ -674,15 +726,13 @@ syncword = default_syncword if message is None or channel is None: continue - for retry in range(3): - if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword): - break + self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0) - return remaining_items + return [len(to_transmit), remaining_items] def start_listening(self, channel): # Initialize the receiver self.stop_listening()