Python LT8900 via SPI

Check-in [089d2c0ce8]
Login

Check-in [089d2c0ce8]

Overview
Comment:Added support for multiple queues, and ensured queue management is locked
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 089d2c0ce83867ff200cabf54928393f5ad1fed44ccffe9bfed135e1b57124dc
User & Date: rkeene on 2020-05-01 01:33:30
Other Links: manifest | tags
Context
2020-05-02
15:10
Substantial improvement to software queue management check-in: 5f845c3e3c user: rkeene tags: trunk
2020-05-01
01:33
Added support for multiple queues, and ensured queue management is locked check-in: 089d2c0ce8 user: rkeene tags: trunk
2020-04-30
17:50
Added a software tx queue and support for setting the syncword as part of the transmit call check-in: fc5d84e9e1 user: rkeene tags: trunk
Changes

Modified lt8900_spi/__init__.py from [05e9f1b5b0] to [3c959f71b4].

226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244


245
246
247
248
249
250
251

	def __init__(self, spi_bus, spi_dev, config = None):
		spi = spidev.SpiDev()
		spi.open(spi_bus, spi_dev)
		self._spi = spi

		self._dequeue_thread = None
		self._syncword = None

		self.configure(config)

		if len(self._register_map) != 53:
			raise ValueError('Inconsistent register map!')

		self._channel_queue = {}

		return None

	def __del__(self):


		self._spi.close()

	def _debug(self, message):
		if self._config is not None and 'debug_log_command' in self._config:
			self._config['debug_log_command'](message)
		return None








|

|




|




>
>







226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253

	def __init__(self, spi_bus, spi_dev, config = None):
		spi = spidev.SpiDev()
		spi.open(spi_bus, spi_dev)
		self._spi = spi

		self._dequeue_thread = None
		self._last_syncword = None

		self.configure(config, update = False)

		if len(self._register_map) != 53:
			raise ValueError('Inconsistent register map!')

		self._software_tx_queue = {}

		return None

	def __del__(self):
		self._debug('Deleting object')
		self._config['use_software_tx_queue'] = False
		self._spi.close()

	def _debug(self, message):
		if self._config is not None and 'debug_log_command' in self._config:
			self._config['debug_log_command'](message)
		return None

411
412
413
414
415
416
417
418
419
420
421



422
423
424
425
426
427
428
429
430
431
432
433
434
435
436

437
438
439

440
441

442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
			mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0]
			key_value = (value & mask) >> bit_range[0]
			result[key] = key_value

		# Return the filled in structure
		return result

	def configure(self, config):
		if config is None:
			config = {}




		self._config = config

		with self._get_mutex():
			self._spi.max_speed_hz = self._config.get('frequency', 4000000)
			self._spi.bits_per_word = self._config.get('bits_per_word', 8)
			self._spi.cshigh = self._config.get('csigh', False)
			self._spi.no_cs  = self._config.get('no_cs', False)
			self._spi.lsbfirst = self._config.get('lsbfirst', False)
			self._spi.threewire = self._config.get('threewire', False)
			self._spi.mode = self._config.get('mode', 1)

		# If using a queue, start a thread to run the queue
		if self._should_use_queue():
			if self._dequeue_thread is None:
				self._dequeue_thread = threading.Thread(target = self._run_queue)

				self._dequeue_thread.start()
		else:
			if self._dequeue_thread is not None:

				self._dequeue_thread.join()
				self._dequeue_thread = None


		return None

	def initialize(self):
		self._reset_device()

		self._set_defaults()

		if not self._check_radio():
			return False
		return True

	def set_channel(self, channel):
		state = self.get_register_bits('radio_state')
		state['channel'] = channel

		self.put_register_bits('radio_state', state)

		return state

	def set_syncword(self, syncword, force = False, _queue_instead_of_xmit = True):
		# If queuing is being used, just store this message
		if _queue_instead_of_xmit and self._should_use_queue():
			self._enqueue(syncword, None, None)
			return None

		# Do not set the syncword again if it's not needed
		if not force:
			if self._syncword is not None:
				if syncword == self._syncword:
					return None

		self._syncword = syncword

		packet_config = self.get_register_bits('packet_config')
		packet_config['syncword_len'] = len(syncword) - 1

		self.put_register_bits('packet_config', packet_config)

		if len(syncword) == 1:







|



>
>
>
|













|
>



>


>




















|

|
|




|
|


|







413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
			mask = ((1 << (bit_range[1] - bit_range[0] + 1)) - 1) << bit_range[0]
			key_value = (value & mask) >> bit_range[0]
			result[key] = key_value

		# Return the filled in structure
		return result

	def configure(self, config, update = True):
		if config is None:
			config = {}

		if update:
			self._config.update(config)
		else:
			self._config = config

		with self._get_mutex():
			self._spi.max_speed_hz = self._config.get('frequency', 4000000)
			self._spi.bits_per_word = self._config.get('bits_per_word', 8)
			self._spi.cshigh = self._config.get('csigh', False)
			self._spi.no_cs  = self._config.get('no_cs', False)
			self._spi.lsbfirst = self._config.get('lsbfirst', False)
			self._spi.threewire = self._config.get('threewire', False)
			self._spi.mode = self._config.get('mode', 1)

		# If using a queue, start a thread to run the queue
		if self._should_use_queue():
			if self._dequeue_thread is None:
				self._dequeue_thread = threading.Thread(target = self._run_queue, daemon = True)
				self._software_tx_queue_mutex = threading.Lock()
				self._dequeue_thread.start()
		else:
			if self._dequeue_thread is not None:
				self._debug("Joining existing thread to wait for termination")
				self._dequeue_thread.join()
				self._dequeue_thread = None
				self._software_tx_queue_mutex = None

		return None

	def initialize(self):
		self._reset_device()

		self._set_defaults()

		if not self._check_radio():
			return False
		return True

	def set_channel(self, channel):
		state = self.get_register_bits('radio_state')
		state['channel'] = channel

		self.put_register_bits('radio_state', state)

		return state

	def set_syncword(self, syncword, force = False, submit_queue = '__DEFAULT__'):
		# If queuing is being used, just store this message
		if submit_queue is not None and self._should_use_queue():
			self._enqueue(submit_queue, syncword, None, None, post_delay = 0)
			return None

		# Do not set the syncword again if it's not needed
		if not force:
			if self._last_syncword is not None:
				if syncword == self._last_syncword:
					return None

		self._last_syncword = syncword

		packet_config = self.get_register_bits('packet_config')
		packet_config['syncword_len'] = len(syncword) - 1

		self.put_register_bits('packet_config', packet_config)

		if len(syncword) == 1:
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
		with self._get_mutex(lock):
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)

		self._debug("Writing: {} = {}".format(log_message, result))

		return new_message

	def transmit(self, message, channel = None, lock = True, _queue_instead_of_xmit = True, post_delay = 0, syncword = None):
		# If we are using a radio transmit queue, just queue this message
		# (unless we are called from the dequeue procedure)
		if _queue_instead_of_xmit and self._should_use_queue():
			if syncword is None:
				syncword = self._syncword
			self._enqueue(syncword, message, channel, post_delay = post_delay)
			return True

		sent_packet = True

		with self._get_mutex(lock):
			# Set the syncword
			if syncword is not None:
				self.set_syncword(syncword, _queue_instead_of_xmit = False)

			if channel is None:
				state = self.get_register_bits('radio_state')
				channel = state['channel']

			# Initialize the transmitter
			self.put_register_bits('radio_state', {







|


|

|
|







|







516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
		with self._get_mutex(lock):
			result = self._spi.xfer(new_message, self._spi.max_speed_hz, 10)

		self._debug("Writing: {} = {}".format(log_message, result))

		return new_message

	def transmit(self, message, channel = None, lock = True, post_delay = 0, syncword = None, submit_queue = '__DEFAULT__'):
		# If we are using a radio transmit queue, just queue this message
		# (unless we are called from the dequeue procedure)
		if submit_queue is not None and self._should_use_queue():
			if syncword is None:
				syncword = self._last_syncword
			self._enqueue(submit_queue, syncword, message, channel, post_delay = post_delay)
			return True

		sent_packet = True

		with self._get_mutex(lock):
			# Set the syncword
			if syncword is not None:
				self.set_syncword(syncword, submit_queue = None)

			if channel is None:
				state = self.get_register_bits('radio_state')
				channel = state['channel']

			# Initialize the transmitter
			self.put_register_bits('radio_state', {
565
566
567
568
569
570
571

572
573
574
575
576
577
578
579
580
581
582
583
584
585

586

587

588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612

613



614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637

638
639
640
641
642
643
644
645
646
647
648
649

650
651
652
653
654
655
656
657
658
659
660

661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684

				if radio_status['framer_status'] == 0:
					sent_packet = False
					break
				time.sleep(0.001)

		if post_delay != 0:

			time.sleep(post_delay)

		return sent_packet

	def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None):
		delay = delay / len(channels)
		for channel in channels:
			for i in range(retries):
				if not self.transmit(message, channel, post_delay = delay, syncword = syncword):
					return False

		return True

	def _enqueue(self, syncword, message, channel, post_delay = 0.01):

		if channel not in self._channel_queue:

			self._channel_queue[channel] = []

			dispatch_time = time.time()
		else:
			last_item = self._channel_queue[channel][-1]
			dispatch_time = last_item['time'] + last_item['post_delay']

		self._channel_queue[channel].append({
			'syncword': syncword,
			'message': message,
			'channel': channel,
			'time': dispatch_time,
			'post_delay': post_delay,
			'transmitted': False
		})

		return None

	def _run_queue(self):
		self._debug("Started run_queue process")
		# Do not run the queue if we are using a queue
		if not self._should_use_queue():
			self._debug("Ending run_queue process, not using the queue")
			return None

		sleep_time = 0
		while True:

			time.sleep(sleep_time)



			item_count = self._run_queue_once()
			self._debug("Running the queue, {} items left".format(item_count))

			if item_count == 0:
				# If the queue is empty and we are no longer queuing
				# events, exit this function (which should be joined)
				if not self._should_use_queue():
					self._debug("Request to stop run_queue process, exiting")
					break

				# If there are no events, wait a bit
				# longer before trying again
				sleep_time = 0.5
				continue

			# If there are more events to process, try again in 1ms
			sleep_time = 0.001

		return None

	def _run_queue_once(self):
		now = time.time()
		to_transmit = []
		items_remaining = 0

		for channel in self._channel_queue:
			found_item_in_queue = False
			for item in self._channel_queue[channel]:
				if item['transmitted']:
					continue

				found_item_in_queue = True
				if item['time'] <= now:
					item['transmitted'] = True
					to_transmit.append(item)
					continue


				items_remaining += 1
			# If the channel has only transmitted items, clear it
			# out except for the last item (which will have a
			# record of when the next event should take place)
			if not found_item_in_queue:
				self._channel_queue[channel] = [self._channel_queue[channel][-1]]

		default_syncword = None

		with self._get_mutex():
			for item in to_transmit:

				syncword = item['syncword']
				message = item['message']
				channel = item['channel']

				if syncword is not None:
					default_syncword = syncword
				else:
					syncword = default_syncword

				if message is None or channel is None:
					continue

				for retry in range(3):
					if self.transmit(message, channel, lock = False, _queue_instead_of_xmit = False, syncword = syncword):
						break

		return items_remaining
		
	def start_listening(self, channel):
		# Initialize the receiver
		self.stop_listening()

		# Go into listening mode
		self.put_register_bits('radio_state', {







>




|



|




|
>
|
>
|
>
|
|
|
|

|
|
|
|
|
|
<
|





<
<
<
<



>

>
>
>

|






|














|
>
|
|
|
<
<
<
<
|
|
|
|
|
>
|
<
<
<
|
<





>













|


|







573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610

611
612
613
614
615
616




617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652




653
654
655
656
657
658
659



660

661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690

				if radio_status['framer_status'] == 0:
					sent_packet = False
					break
				time.sleep(0.001)

		if post_delay != 0:
			raise ValueError('No sleeping allowed') # XXX
			time.sleep(post_delay)

		return sent_packet

	def multi_transmit(self, message, channels, retries = 3, delay = 0.1, syncword = None, submit_queue = '__DEFAULT__'):
		delay = delay / len(channels)
		for channel in channels:
			for i in range(retries):
				if not self.transmit(message, channel, post_delay = delay, syncword = syncword, submit_queue = submit_queue):
					return False

		return True

	def _enqueue(self, submit_queue, syncword, message, channel, post_delay = 0):
		with self._software_tx_queue_mutex:
			if submit_queue not in self._software_tx_queue:
				self._software_tx_queue[submit_queue] = []

			if len(self._software_tx_queue[submit_queue]) == 0:
				dispatch_time = time.time()
			else:
				last_item = self._software_tx_queue[submit_queue][-1]
				dispatch_time = last_item['time'] + last_item['post_delay']

			self._software_tx_queue[submit_queue].append({
				'syncword': syncword,
				'message': message,
				'channel': channel,
				'time': dispatch_time,
				'post_delay': post_delay

			})

		return None

	def _run_queue(self):
		self._debug("Started run_queue process")





		sleep_time = 0
		while True:
			self._debug("Sleeping for {} seconds".format(sleep_time))
			time.sleep(sleep_time)
			with self._software_tx_queue_mutex:
				for queue in self._software_tx_queue:
					self._debug("Running the queue named {}: {} items left".format(queue, len(self._software_tx_queue[queue])))
			item_count = self._run_queue_once()
			self._debug("Completed running the queue, {} items left (continue queue = {})".format(item_count, self._should_use_queue()))

			if item_count == 0:
				# If the queue is empty and we are no longer queuing
				# events, exit this function (which should be joined)
				if not self._should_use_queue():
					self._debug("Request to stop run_queue process, exiting")
					return None

				# If there are no events, wait a bit
				# longer before trying again
				sleep_time = 0.5
				continue

			# If there are more events to process, try again in 1ms
			sleep_time = 0.001

		return None

	def _run_queue_once(self):
		now = time.time()
		to_transmit = []
		remaining_items = 0
		with self._software_tx_queue_mutex:
			for submit_queue in self._software_tx_queue:
				new_software_tx_queue = []
				for item in self._software_tx_queue[submit_queue]:




					if item['time'] <= now:
						item['from_queue'] = submit_queue
						to_transmit.append(item)
						continue
					else:
						new_software_tx_queue.append(item)
						remaining_items += 1



				self._software_tx_queue[submit_queue] = new_software_tx_queue


		default_syncword = None

		with self._get_mutex():
			for item in to_transmit:
				self._debug("Transmitting item {}".format(item))
				syncword = item['syncword']
				message = item['message']
				channel = item['channel']

				if syncword is not None:
					default_syncword = syncword
				else:
					syncword = default_syncword

				if message is None or channel is None:
					continue

				for retry in range(3):
					if self.transmit(message, channel, lock = False, submit_queue = None, syncword = syncword):
						break

		return remaining_items
		
	def start_listening(self, channel):
		# Initialize the receiver
		self.stop_listening()

		# Go into listening mode
		self.put_register_bits('radio_state', {