Python LT8900 via SPI

Check-in [5f845c3e3c]
Login

Check-in [5f845c3e3c]

Overview
Comment:Substantial improvement to software queue management
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 5f845c3e3c2336922689a0b86aa9e9ebb5bbc5dac37fdeac70db4233953b020b
User & Date: rkeene on 2020-05-02 15:10:33
Other Links: manifest | tags
Context
2020-05-02
15:12
lt8900_spi v2.3 check-in: 049d34a933 user: rkeene tags: 2.3, trunk
15:10
Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk
2020-05-01
01:33
Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk
Changes

Modified lt8900_spi/__init__.py from [3c959f71b4] to [dd13b908ed].

18
19
20
21
22
23
24

25
26
27
28
29
30
31
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32







+







# | P1-17 | P1-18 | P1-21 | P1-19 | P1-23 | P1-24 | P1-25  |
# -________________________________________________________-
#                     Raspberry Pi

import spidev
import time
import threading
import collections

class dummy_context_mgr():
	def __enter__(self):
		return None
	def __exit__(self, exc_type, exc_value, traceback):
		return False

228
229
230
231
232
233
234



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250

251
252









253
254
255
256
257
258
259
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243


244
245
246
247
248
249
250
251

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270







+
+
+





-
-








-
+


+
+
+
+
+
+
+
+
+







		spi = spidev.SpiDev()
		spi.open(spi_bus, spi_dev)
		self._spi = spi

		self._dequeue_thread = None
		self._last_syncword = None

		self._software_tx_queue = {}
		self._software_tx_queue_next_time = {}

		self.configure(config, update = False)

		if len(self._register_map) != 53:
			raise ValueError('Inconsistent register map!')

		self._software_tx_queue = {}

		return None

	def __del__(self):
		self._debug('Deleting object')
		self._config['use_software_tx_queue'] = False
		self._spi.close()

	def _debug(self, message):
		if self._config is not None and 'debug_log_command' in self._config:
		if 'debug_log_command' in self._config:
			self._config['debug_log_command'](message)
		return None

	def _error(self, message):
		log_command = None
		if 'error_log_command' in self._config:
			log_command = self._config['error_log_command']
		elif 'debug_log_command' in self._config:
			log_command = self._config['debug_log_command']
		log_command(message)
		return None

	def _get_mutex(self, real_mutex = True):
		if not real_mutex:
			return dummy_context_mgr()

		mutex = self._config.get('mutex', dummy_context_mgr())
		return mutex
510
511
512
513
514
515
516
517

518
519
520
521
522
523
524
521
522
523
524
525
526
527

528
529
530
531
532
533
534
535







-
+







		if include_length:
			new_message = new_message + [len(message)]
		new_message = new_message + message
		log_message = [] + new_message

		# Transfer the message
		with self._get_mutex(lock):
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 0)

		self._debug("Writing: {} = {}".format(log_message, result))

		return new_message

	def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
		# If we are using a radio transmit queue, just queue this message
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587







588


589
590


591
592
593
594



595
596
597

598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619

620
621



622
623

624
625
626










627
628

629
630
631
632
633
634
635
636
637
638
639









640
641


642
643
644
645
646
647
648

649
650

651
652
653
654






655
656
657
658
659
660






















661
662
663

664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680

681
682
683

684
685
686
687
688
689
690
584
585
586
587
588
589
590

591
592
593
594
595


596
597
598
599
600
601
602
603
604
605


606
607
608
609
610
611
612
613
614
615
616

617






618
619
620
621
622

623
624
625
626
627
628
629
630
631
632
633


634
635
636
637
638
639



640
641
642
643
644
645
646
647
648
649
650

651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671


672
673
674
675
676
677

678
679
680
681
682
683




684
685
686
687
688
689






690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730


731

732

733
734
735
736
737
738
739
740







-





-
-
+
+
+
+
+
+
+

+
+
-
-
+
+




+
+
+


-
+
-
-
-
-
-
-





-










+
-
-
+
+
+


+
-
-
-
+
+
+
+
+
+
+
+
+
+

-
+











+
+
+
+
+
+
+
+
+
-
-
+
+




-


+


+
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+



+















-
-
+
-

-
+








				if radio_status['framer_status'] == 0:
					sent_packet = False
					break
				time.sleep(0.001)

		if post_delay != 0:
			raise ValueError('No sleeping allowed') # XXX
			time.sleep(post_delay)

		return sent_packet

	def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
		delay = delay / len(channels)
		for channel in channels:
		if len(channels) == 0 or retries == 0:
			self._error("Asked to send the message {} a total of zero times ({} channels, {} retries)".format(message, channels, retries))

		for channel_idx in range(len(channels)):
			if channel_idx == (len(channels) - 1):
				retries -= 1
			channel = channels[channel_idx]
			for i in range(retries):
				if not self.transmit(message, channel, post_delay = 0, syncword = syncword, submit_queue = submit_queue):
					return False
				if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
					return False
		if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
			return False

		return True

	def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
		if not self._should_use_queue():
			raise ValueError('internal error: _enqueue called with queueing disabled')

		with self._software_tx_queue_mutex:
			if submit_queue not in self._software_tx_queue:
				self._software_tx_queue[submit_queue] = []
				self._software_tx_queue[submit_queue] = collections.deque([])

			if len(self._software_tx_queue[submit_queue]) == 0:
				dispatch_time = time.time()
			else:
				last_item = self._software_tx_queue[submit_queue][-1]
				dispatch_time = last_item['time'] + last_item['post_delay']

			self._software_tx_queue[submit_queue].append({
				'syncword': syncword,
				'message': message,
				'channel': channel,
				'time': dispatch_time,
				'post_delay': post_delay
			})

		return None

	def _run_queue(self):
		self._debug("Started run_queue process")

		sleep_time = 0
		while True:
			if sleep_time != 0:
			self._debug("Sleeping for {} seconds".format(sleep_time))
			time.sleep(sleep_time)
				self._debug("Sleeping for {} seconds".format(sleep_time))
				time.sleep(sleep_time)

			with self._software_tx_queue_mutex:
				for queue in self._software_tx_queue:
					if len(self._software_tx_queue[queue]) != 0:
					self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
			item_count = self._run_queue_once()
			self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue()))
						self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))

			try:
				[processed_items, remaining_items] = self._run_queue_once()
			except Exception as error_info:
				self._error("Failed to run queue: {}".format(error_info.args))
				processed_items = 0
				remaining_items = 0

			self._debug("Completed running the queue, did {} items and {} items left (continue queue = {})".format(processed_items, remaining_items, self._should_use_queue()))

			if item_count == 0:
			if remaining_items == 0:
				# If the queue is empty and we are no longer queuing
				# events, exit this function (which should be joined)
				if not self._should_use_queue():
					self._debug("Request to stop run_queue process, exiting")
					return None

				# If there are no events, wait a bit
				# longer before trying again
				sleep_time = 0.5
				continue

			if processed_items == 0:
				# If we processed no items, but there are items left to
				# process, back off slightly
				if sleep_time == 0:
					sleep_time = 0.001
				else:
					sleep_time = min(0.5, sleep_time * 2)
				continue

			# If there are more events to process, try again in 1ms
			sleep_time = 0.001
			# If there are more events to process, try again
			sleep_time = 0

		return None

	def _run_queue_once(self):
		now = time.time()
		to_transmit = []
		remaining_items = 0
		now = time.time()
		with self._software_tx_queue_mutex:
			for submit_queue in self._software_tx_queue:
				# Determine if we should run this queue yet
				new_software_tx_queue = []
				for item in self._software_tx_queue[submit_queue]:
					if item['time'] <= now:
						item['from_queue'] = submit_queue
				if submit_queue not in self._software_tx_queue_next_time:
					self._software_tx_queue_next_time[submit_queue] = now

				queue_next_time = self._software_tx_queue_next_time[submit_queue]
				if now < queue_next_time:
					remaining_items += len(self._software_tx_queue[submit_queue])
						to_transmit.append(item)
						continue
					else:
						new_software_tx_queue.append(item)
						remaining_items += 1
				self._software_tx_queue[submit_queue] = new_software_tx_queue
					continue

				# Record how many items to pop off this queue
				pop_items = 0
				for item in self._software_tx_queue[submit_queue]:
					pop_items += 1

					# If the last item we're about to transmit requires a delay, make
					# a note of it in the queue time and don't pull anything else
					# from this queue
					self._software_tx_queue_next_time[submit_queue] = now + item['post_delay']
					if item['post_delay'] != 0:
						break

				# Pop off the items to transmit in this run into a list
				if pop_items != 0:
					self._debug("Found {} items to transmit in the {} queue".format(pop_items, submit_queue))
				while pop_items != 0:
					to_transmit.append(self._software_tx_queue[submit_queue].popleft())
					pop_items -= 1

				remaining_items += len(self._software_tx_queue[submit_queue])

		default_syncword = None

		self._debug("Getting ready to transmit {} items".format(len(to_transmit)))
		with self._get_mutex():
			for item in to_transmit:
				self._debug("Transmitting item {}".format(item))
				syncword = item['syncword']
				message = item['message']
				channel = item['channel']

				if syncword is not None:
					default_syncword = syncword
				else:
					syncword = default_syncword

				if message is None or channel is None:
					continue

				for retry in range(3):
					if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword):
				self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword, post_delay = 0)
						break

		return remaining_items
		return [len(to_transmit), remaining_items]
		
	def start_listening(self, channel):
		# Initialize the receiver
		self.stop_listening()

		# Go into listening mode
		self.put_register_bits('radio_state', {