Python LT8900 via SPI

Check-in [5f845c3e3c]
Login
Overview
Comment:Substantial improvement to software queue management
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 5f845c3e3c2336922689a0b86aa9e9ebb5bbc5dac37fdeac70db4233953b020b
User & Date: rkeene on 2020-05-02 15:10:33
Other Links: manifest | tags
Context
2020-05-02
15:12
lt8900_spi v2.3 check-in: 049d34a933 user: rkeene tags: 2.3, trunk
15:10
Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk
2020-05-01
01:33
Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk
Changes

Modified lt8900_spi/__init__.py from [3c959f71b4] to [dd13b908ed].

18
19
20
21
22
23
24

25
26
27
28
29
30
31
# | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25  |
# -________________________________________________________-
#                     Raspberry Pi

import spidev
import time
import threading


class dummy_context_mgr():
	def __enter__(self):
		return None
	def __exit__(self, exc_type, exc_value, traceback):
		return False








>







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25  |
# -________________________________________________________-
#                     Raspberry Pi

import spidev
import time
import threading
import collections

class dummy_context_mgr():
	def __enter__(self):
		return None
	def __exit__(self, exc_type, exc_value, traceback):
		return False

228
229
230
231
232
233
234



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252









253
254
255
256
257
258
259
		spi = spidev.SpiDev()
		spi.open(spi_bus, spi_dev)
		self._spi = spi

		self._dequeue_thread = None
		self._last_syncword = None




		self.configure(config, update = False)

		if len(self._register_map) != 53:
			raise ValueError('Inconsistent register map!')

		self._software_tx_queue = {}

		return None

	def __del__(self):
		self._debug('Deleting object')
		self._config['use_software_tx_queue'] = False
		self._spi.close()

	def _debug(self, message):
		if self._config is not None and 'debug_log_command' in self._config:
			self._config['debug_log_command'](message)
		return None










	def _get_mutex(self, real_mutex = True):
		if not real_mutex:
			return dummy_context_mgr()

		mutex = self._config.get('mutex', dummy_context_mgr())
		return mutex







>
>
>





<
<








|


>
>
>
>
>
>
>
>
>







229
230
231
232
233
234
235
236
237
238
239
240
241
242
243


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
		spi = spidev.SpiDev()
		spi.open(spi_bus, spi_dev)
		self._spi = spi

		self._dequeue_thread = None
		self._last_syncword = None

		self._software_tx_queue = {}
		self._software_tx_queue_next_time = {}

		self.configure(config, update = False)

		if len(self._register_map) != 53:
			raise ValueError('Inconsistent register map!')



		return None

	def __del__(self):
		self._debug('Deleting object')
		self._config['use_software_tx_queue'] = False
		self._spi.close()

	def _debug(self, message):
		if 'debug_log_command' in self._config:
			self._config['debug_log_command'](message)
		return None

	def _error(self, message):
		log_command = None
		if 'error_log_command' in self._config:
			log_command = self._config['error_log_command']
		elif 'debug_log_command' in self._config:
			log_command = self._config['debug_log_command']
		log_command(message)
		return None

	def _get_mutex(self, real_mutex = True):
		if not real_mutex:
			return dummy_context_mgr()

		mutex = self._config.get('mutex', dummy_context_mgr())
		return mutex
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
		if include_length:
			new_message = new_message + [len(message)]
		new_message = new_message + message
		log_message = [] + new_message

		# Transfer the message
		with self._get_mutex(lock):
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)

		self._debug("Writing: {} = {}".format(log_message, result))

		return new_message

	def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
		# If we are using a radio transmit queue, just queue this message







|







521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
		if include_length:
			new_message = new_message + [len(message)]
		new_message = new_message + message
		log_message = [] + new_message

		# Transfer the message
		with self._get_mutex(lock):
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0)

		self._debug("Writing: {} = {}".format(log_message, result))

		return new_message

	def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
		# If we are using a radio transmit queue, just queue this message
573
574
575
576
577
578
579
580
581
582
583
584
585
586


587



588


589
590
591
592
593
594



595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619

620
621

622
623

624


625




626

627
628
629
630
631
632
633
634
635
636
637
638
639









640
641
642
643
644
645
646
647
648

649
650

651
652
653


654
655
656
657


658
659















660
661
662
663

664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690

				if radio_status['framer_status'] == 0:
					sent_packet = False
					break
				time.sleep(0.001)

		if post_delay != 0:
			raise ValueError('No sleeping allowed') # XXX
			time.sleep(post_delay)

		return sent_packet

	def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
		delay = delay / len(channels)


		for channel in channels:



			for i in range(retries):


				if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
					return False

		return True

	def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):



		with self._software_tx_queue_mutex:
			if submit_queue not in self._software_tx_queue:
				self._software_tx_queue[submit_queue] = []

			if len(self._software_tx_queue[submit_queue]) == 0:
				dispatch_time = time.time()
			else:
				last_item = self._software_tx_queue[submit_queue][-1]
				dispatch_time = last_item['time'] + last_item['post_delay']

			self._software_tx_queue[submit_queue].append({
				'syncword': syncword,
				'message': message,
				'channel': channel,
				'time': dispatch_time,
				'post_delay': post_delay
			})

		return None

	def _run_queue(self):
		self._debug("Started run_queue process")

		sleep_time = 0
		while True:

			self._debug("Sleeping for {} seconds".format(sleep_time))
			time.sleep(sleep_time)

			with self._software_tx_queue_mutex:
				for queue in self._software_tx_queue:

					self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))


			item_count = self._run_queue_once()




			self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue()))


			if item_count == 0:
				# If the queue is empty and we are no longer queuing
				# events, exit this function (which should be joined)
				if not self._should_use_queue():
					self._debug("Request to stop run_queue process, exiting")
					return None

				# If there are no events, wait a bit
				# longer before trying again
				sleep_time = 0.5
				continue










			# If there are more events to process, try again in 1ms
			sleep_time = 0.001

		return None

	def _run_queue_once(self):
		now = time.time()
		to_transmit = []
		remaining_items = 0

		with self._software_tx_queue_mutex:
			for submit_queue in self._software_tx_queue:

				new_software_tx_queue = []
				for item in self._software_tx_queue[submit_queue]:
					if item['time'] <= now:


						item['from_queue'] = submit_queue
						to_transmit.append(item)
						continue
					else:


						new_software_tx_queue.append(item)
						remaining_items += 1















				self._software_tx_queue[submit_queue] = new_software_tx_queue

		default_syncword = None


		with self._get_mutex():
			for item in to_transmit:
				self._debug("Transmitting item {}".format(item))
				syncword = item['syncword']
				message = item['message']
				channel = item['channel']

				if syncword is not None:
					default_syncword = syncword
				else:
					syncword = default_syncword

				if message is None or channel is None:
					continue

				for retry in range(3):
					if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword):
						break

		return remaining_items
		
	def start_listening(self, channel):
		# Initialize the receiver
		self.stop_listening()

		# Go into listening mode
		self.put_register_bits('radio_state', {







<





|
>
>
|
>
>
>

>
>
|
|




>
>
>


|
<
<
<
<
<
<





<










>
|
|
>


>
|
>
>
|
>
>
>
>
|
>

|











>
>
>
>
>
>
>
>
>
|
|




<


>


>
|
|
|
>
>
|
<
|
|
>
>
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|



>















<
|
<

|







584
585
586
587
588
589
590

591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617






618
619
620
621
622

623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677

678
679
680
681
682
683
684
685
686
687
688
689

690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730

731

732
733
734
735
736
737
738
739
740

				if radio_status['framer_status'] == 0:
					sent_packet = False
					break
				time.sleep(0.001)

		if post_delay != 0:

			time.sleep(post_delay)

		return sent_packet

	def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
		if len(channels) == 0 or retries == 0:
			self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries))

		for channel_idx in range(len(channels)):
			if channel_idx == (len(channels) - 1):
				retries -= 1
			channel = channels[channel_idx]
			for i in range(retries):
				if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue):
					return False
		if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
			return False

		return True

	def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
		if not self._should_use_queue():
			raise ValueError('internal error: _enqueue called with queueing disabled')

		with self._software_tx_queue_mutex:
			if submit_queue not in self._software_tx_queue:
				self._software_tx_queue[submit_queue] = collections.deque([])







			self._software_tx_queue[submit_queue].append({
				'syncword': syncword,
				'message': message,
				'channel': channel,

				'post_delay': post_delay
			})

		return None

	def _run_queue(self):
		self._debug("Started run_queue process")

		sleep_time = 0
		while True:
			if sleep_time != 0:
				self._debug("Sleeping for {} seconds".format(sleep_time))
				time.sleep(sleep_time)

			with self._software_tx_queue_mutex:
				for queue in self._software_tx_queue:
					if len(self._software_tx_queue[queue]) != 0:
						self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))

			try:
				[processed_items, remaining_items] = self._run_queue_once()
			except Exception as error_info:
				self._error("Failed to run queue: {}".format(error_info.args))
				processed_items = 0
				remaining_items = 0

			self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue()))

			if remaining_items == 0:
				# If the queue is empty and we are no longer queuing
				# events, exit this function (which should be joined)
				if not self._should_use_queue():
					self._debug("Request to stop run_queue process, exiting")
					return None

				# If there are no events, wait a bit
				# longer before trying again
				sleep_time = 0.5
				continue

			if processed_items == 0:
				# If we processed no items, but there are items left to
				# process, back off slightly
				if sleep_time == 0:
					sleep_time = 0.001
				else:
					sleep_time = min(0.5, sleep_time * 2)
				continue

			# If there are more events to process, try again
			sleep_time = 0

		return None

	def _run_queue_once(self):

		to_transmit = []
		remaining_items = 0
		now = time.time()
		with self._software_tx_queue_mutex:
			for submit_queue in self._software_tx_queue:
				# Determine if we should run this queue yet
				if submit_queue not in self._software_tx_queue_next_time:
					self._software_tx_queue_next_time[submit_queue] = now

				queue_next_time = self._software_tx_queue_next_time[submit_queue]
				if now < queue_next_time:
					remaining_items += len(self._software_tx_queue[submit_queue])

					continue

				# Record how many items to pop off this queue
				pop_items = 0
				for item in self._software_tx_queue[submit_queue]:
					pop_items += 1

					# If the last item we're about to transmit requires a delay, make
					# a note of it in the queue time and don't pull anything else
					# from this queue
					self._software_tx_queue_next_time[submit_queue] = now + item['post_delay']
					if item['post_delay'] != 0:
						break

				# Pop off the items to transmit in this run into a list
				if pop_items != 0:
					self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue))
				while pop_items != 0:
					to_transmit.append(self._software_tx_queue[submit_queue].popleft())
					pop_items -= 1

				remaining_items += len(self._software_tx_queue[submit_queue])

		default_syncword = None

		self._debug("Getting ready to transmit {} items".format(len(to_transmit)))
		with self._get_mutex():
			for item in to_transmit:
				self._debug("Transmitting item {}".format(item))
				syncword = item['syncword']
				message = item['message']
				channel = item['channel']

				if syncword is not None:
					default_syncword = syncword
				else:
					syncword = default_syncword

				if message is None or channel is None:
					continue


				self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0)


		return [len(to_transmit), remaining_items]
		
	def start_listening(self, channel):
		# Initialize the receiver
		self.stop_listening()

		# Go into listening mode
		self.put_register_bits('radio_state', {