LoRa Сканер частот
Понадобится:
- ESP32
- LoRa-02/SX1278/433MHz
- SSD1306 OLED
ulora.py
"""
ULoRa: A lightweight library for the SX127x LoRa module.
This library provides functionalities for configuring and communicating with the SX127x.
It supports functions for setting frequency, TX power, bandwidth, spreading factor, etc.,
as well as packet transmission and reception.
"""
import gc
import machine
from machine import SPI, Pin
from utime import ticks_ms, sleep_ms # ticks_ms and sleep_ms imported from utime
from micropython import const
# ============================================================================
# SX127x Register Definitions
# ============================================================================
REG_FIFO = const(0x00)
REG_OP_MODE = const(0x01)
REG_FRF_MSB = const(0x06)
REG_FRF_MID = const(0x07)
REG_FRF_LSB = const(0x08)
REG_PA_CONFIG = const(0x09)
REG_LNA = const(0x0C)
REG_FIFO_ADDR_PTR = const(0x0D)
REG_FIFO_TX_BASE_ADDR = const(0x0E)
REG_FIFO_RX_BASE_ADDR = const(0x0F)
REG_FIFO_RX_CURRENT_ADDR= const(0x10)
REG_IRQ_FLAGS_MASK = const(0x11)
REG_IRQ_FLAGS = const(0x12)
REG_RX_NB_BYTES = const(0x13)
REG_PKT_SNR_VALUE = const(0x19)
REG_PKT_RSSI_VALUE = const(0x1A)
REG_MODEM_CONFIG_1 = const(0x1D)
REG_MODEM_CONFIG_2 = const(0x1E)
REG_PREAMBLE_MSB = const(0x20)
REG_PREAMBLE_LSB = const(0x21)
REG_PAYLOAD_LENGTH = const(0x22)
REG_FIFO_RX_BYTE_ADDR = const(0x25)
REG_MODEM_CONFIG_3 = const(0x26)
REG_RSSI_WIDEBAND = const(0x2C)
REG_DETECTION_OPTIMIZE = const(0x31)
REG_DETECTION_THRESHOLD = const(0x37)
REG_SYNC_WORD = const(0x39)
REG_DIO_MAPPING_1 = const(0x40)
REG_VERSION = const(0x42)
REG_INVERTIQ = const(0x33)
REG_INVERTIQ2 = const(0x3B)
# ============================================================================
# SX127x Mode and Power Settings
# ============================================================================
MODE_LONG_RANGE_MODE = const(0x80)
MODE_SLEEP = const(0x00)
MODE_STDBY = const(0x01)
MODE_TX = const(0x03)
MODE_RX_CONTINUOUS = const(0x05)
MODE_RX_SINGLE = const(0x06)
PA_OUTPUT_RFO_PIN = const(0)
PA_OUTPUT_PA_BOOST_PIN = const(0x01)
PA_BOOST = const(0x80)
# ============================================================================
# IRQ Masks
# ============================================================================
IRQ_TX_DONE_MASK = const(0x08)
IRQ_PAYLOAD_CRC_ERROR_MASK = const(0x20)
IRQ_RX_DONE_MASK = const(0x40)
IRQ_RX_TIME_OUT_MASK = const(0x80)
# ============================================================================
# IQ Inversion Constants
# ============================================================================
RFLR_INVERTIQ_RX_MASK = const(0xBF)
RFLR_INVERTIQ_RX_OFF = const(0x00)
RFLR_INVERTIQ_RX_ON = const(0x40)
RFLR_INVERTIQ_TX_MASK = const(0xFE)
RFLR_INVERTIQ_TX_OFF = const(0x01)
RFLR_INVERTIQ_TX_ON = const(0x00)
RFLR_INVERTIQ2_ON = const(0x19)
RFLR_INVERTIQ2_OFF = const(0x1D)
# ============================================================================
# Other Definitions
# ============================================================================
MAX_PKT_LENGTH = const(255)
FifoTxBaseAddr = const(0x00)
FifoRxBaseAddr = const(0x00)
# ============================================================================
# Default LoRa Parameters
# ============================================================================
DEFAULT_PARAMETERS = {
"frequency": 433000000,
"frequency_offset": 0,
"tx_power_level": 10,
"signal_bandwidth": 125e3,
"spreading_factor": 9,
"coding_rate": 5,
"preamble_length": 8,
"implicitHeader": False,
"sync_word": 0x12,
"enable_CRC": True,
"invert_IQ": False,
}
# ============================================================================
# ULoRa Class Definition
# ============================================================================
class ULoRa:
"""
ULoRa class to interface with the SX127x LoRa module.
"""
def __init__(self, spi, pins, parameters=None):
"""
Initialize the LoRa module.
:param spi: Initialized SPI object.
:param pins: Dictionary with pin mappings, e.g.,
{"ss": , "reset": , "dio0": }.
:param parameters: (Optional) Dictionary with LoRa configuration parameters.
"""
self.spi = spi
self.pins = pins
self.parameters = DEFAULT_PARAMETERS.copy()
if parameters:
self.parameters.update(parameters)
# Setup slave select (CS) pin
self.pin_ss = Pin(self.pins["ss"], Pin.OUT)
# Setup reset pin if provided and perform a hardware reset
if "reset" in self.pins:
self.pin_reset = Pin(self.pins["reset"], Pin.OUT)
self.reset_module()
else:
self.pin_reset = None
self.lock = False
self.implicit_header_mode = None
# Check LoRa module version
version = None
for _ in range(5):
version = self.read_register(REG_VERSION)
if version:
break
print("SX127x Version: {}".format(version))
if version != 0x12: # Expected version is 0x12 (18 in decimal)
print("Bad LoRa Connection! Version: {}".format(version))
machine.reset()
else:
print("LoRa Connection OK! Version: {}".format(version))
# Put module in sleep mode for configuration
self.sleep()
# Configure LoRa parameters
self.set_frequency(self.parameters["frequency"])
self.set_signal_bandwidth(self.parameters["signal_bandwidth"])
# Enable LNA boost and auto AGC
self.write_register(REG_LNA, self.read_register(REG_LNA) | 0x03)
self.write_register(REG_MODEM_CONFIG_3, 0x04)
self.set_tx_power(self.parameters["tx_power_level"])
self.set_implicit_header(self.parameters["implicitHeader"])
self.set_spreading_factor(self.parameters["spreading_factor"])
self.set_coding_rate(self.parameters["coding_rate"])
self.set_preamble_length(self.parameters["preamble_length"])
self.set_sync_word(self.parameters["sync_word"])
self.enable_crc(self.parameters["enable_CRC"])
self.invert_iq(self.parameters["invert_IQ"])
# Enable LowDataRateOptimize if symbol duration > 16ms
bw = self.parameters["signal_bandwidth"]
sf = self.parameters["spreading_factor"]
if (1000 / bw / (2 ** sf)) > 16:
self.write_register(REG_MODEM_CONFIG_3, self.read_register(REG_MODEM_CONFIG_3) | 0x08)
# Set FIFO base addresses
self.write_register(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
self.write_register(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
self.standby()
def reset_module(self):
"""
Perform a hardware reset of the LoRa module using the reset pin.
"""
if self.pin_reset is None:
return
# Drive reset pin low for 100ms then high
self.pin_reset.value(0)
sleep_ms(100)
self.pin_reset.value(1)
sleep_ms(100)
# ---------------------------
# Packet Transmission Methods
# ---------------------------
def begin_packet(self, implicit_header=None):
"""
Prepare the module for packet transmission.
:param implicit_header: (Optional) Boolean to override current header mode.
"""
self.standby()
if implicit_header is not None:
self.set_implicit_header(implicit_header)
# Reset FIFO pointer and payload length
self.write_register(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
self.write_register(REG_PAYLOAD_LENGTH, 0)
def write(self, buffer):
"""
Write data to the LoRa FIFO.
:param buffer: Data bytes (or bytearray) to be sent.
:return: Number of bytes written.
"""
current_length = self.read_register(REG_PAYLOAD_LENGTH)
size = len(buffer)
# Ensure packet does not exceed maximum allowed length
size = min(size, MAX_PKT_LENGTH - FifoTxBaseAddr - current_length)
for byte in buffer:
self.write_register(REG_FIFO, byte)
# Update payload length
self.write_register(REG_PAYLOAD_LENGTH, current_length + size)
return size
def end_packet(self):
"""
Transmit the packet and wait until transmission is complete.
"""
# Set module to TX mode
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
# Wait until TX_DONE flag is set
while (self.read_register(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0:
pass
# Clear TX_DONE flag
self.write_register(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
def println(self, message, implicit_header=None, repeat=1):
"""
Transmit a text message.
:param message: String message to transmit.
:param implicit_header: (Optional) Boolean to override header mode.
:param repeat: Number of times to send the message.
"""
if isinstance(message, str):
message = message.encode()
self.begin_packet(implicit_header)
self.write(message)
for _ in range(repeat):
self.end_packet()
self.collect_garbage()
# ---------------------------
# Packet Reception Methods
# ---------------------------
def receive(self, size=0):
"""
Set the module to continuous receive mode.
:param size: Expected payload size. If >0, uses implicit header mode.
"""
self.set_implicit_header(size > 0)
if size > 0:
self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF)
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS)
def listen(self, timeout=1000):
"""
Listen for an incoming packet for a specified timeout.
:param timeout: Timeout in milliseconds.
:return: Received payload as bytes, or None if timeout occurs.
"""
self.receive()
start = ticks_ms()
while True:
if self.received_packet():
return self.read_payload()
if ticks_ms() - start > timeout:
return None
def received_packet(self, size=0):
"""
Check if a packet has been received.
:param size: Expected payload size. If >0, uses implicit header mode.
:return: True if a packet is received, otherwise False.
"""
irq_flags = self.get_irq_flags()
self.set_implicit_header(size > 0)
if size > 0:
self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF)
if irq_flags == IRQ_RX_DONE_MASK:
return True
else:
# If not in single RX mode, reset FIFO pointer and enter single RX mode
if self.read_register(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE):
self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE)
return False
def read_payload(self):
"""
Read the received payload from the FIFO.
:return: Payload data as bytes.
"""
# Set FIFO pointer to the current RX address
self.write_register(REG_FIFO_ADDR_PTR, self.read_register(REG_FIFO_RX_CURRENT_ADDR))
# Determine payload length based on header mode
if self.implicit_header_mode:
packet_length = self.read_register(REG_PAYLOAD_LENGTH)
else:
packet_length = self.read_register(REG_RX_NB_BYTES)
payload = bytearray()
for _ in range(packet_length):
payload.append(self.read_register(REG_FIFO))
self.collect_garbage()
return bytes(payload)
def get_irq_flags(self):
"""
Retrieve and clear the IRQ flags.
:return: IRQ flags value.
"""
irq_flags = self.read_register(REG_IRQ_FLAGS)
self.write_register(REG_IRQ_FLAGS, irq_flags)
return irq_flags
def packet_rssi(self, high_frequency=True):
"""
Get the RSSI value of the last received packet.
:param high_frequency: Boolean flag; if True, uses high frequency offset.
:return: Adjusted RSSI value.
"""
rssi = self.read_register(REG_PKT_RSSI_VALUE)
return rssi - (157 if high_frequency else 164)
def packet_snr(self):
"""
Get the SNR (Signal-to-Noise Ratio) of the last packet.
:return: SNR value.
"""
return self.read_register(REG_PKT_SNR_VALUE) * 0.25
# ---------------------------
# Module Mode Methods
# ---------------------------
def standby(self):
"""
Set the module to standby mode.
"""
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
def sleep(self):
"""
Put the module into sleep mode.
"""
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
# ---------------------------
# Configuration Methods
# ---------------------------
def set_tx_power(self, level, output_pin=PA_OUTPUT_PA_BOOST_PIN):
"""
Set the transmission power level.
:param level: Power level.
:param output_pin: PA output type (PA_OUTPUT_RFO_PIN or PA_OUTPUT_PA_BOOST_PIN).
"""
self.parameters["tx_power_level"] = level
if output_pin == PA_OUTPUT_RFO_PIN:
level = min(max(level, 0), 14)
self.write_register(REG_PA_CONFIG, 0x70 | level)
else:
level = min(max(level, 2), 17)
self.write_register(REG_PA_CONFIG, PA_BOOST | (level - 2))
def set_frequency(self, frequency):
"""
Set the operating frequency.
:param frequency: Frequency in Hz.
"""
self.parameters["frequency"] = frequency
frequency += self.parameters["frequency_offset"]
frf = (frequency << 19) // 32000000
self.write_register(REG_FRF_MSB, (frf >> 16) & 0xFF)
self.write_register(REG_FRF_MID, (frf >> 8) & 0xFF)
self.write_register(REG_FRF_LSB, frf & 0xFF)
def set_spreading_factor(self, sf):
"""
Set the spreading factor (6 to 12).
:param sf: Spreading factor.
"""
sf = min(max(sf, 6), 12)
if sf == 6:
self.write_register(REG_DETECTION_OPTIMIZE, 0xC5)
self.write_register(REG_DETECTION_THRESHOLD, 0x0C)
else:
self.write_register(REG_DETECTION_OPTIMIZE, 0xC3)
self.write_register(REG_DETECTION_THRESHOLD, 0x0A)
current = self.read_register(REG_MODEM_CONFIG_2) & 0x0F
self.write_register(REG_MODEM_CONFIG_2, current | ((sf << 4) & 0xF0))
def set_signal_bandwidth(self, sbw):
"""
Set the signal bandwidth.
:param sbw: Bandwidth in Hz.
"""
# Predefined bandwidth bins
bins = (7800, 10400, 15600, 20800, 31250, 41700, 62500, 125000, 250000)
bw_index = 7 # Default to 125 kHz
if sbw < 10:
bw_index = int(sbw)
else:
for i, bw in enumerate(bins):
if sbw <= bw:
bw_index = i
break
current = self.read_register(REG_MODEM_CONFIG_1) & 0x0F
self.write_register(REG_MODEM_CONFIG_1, current | (bw_index << 4))
def set_coding_rate(self, denominator):
"""
Set the coding rate.
:param denominator: Denominator (between 5 and 8).
"""
denominator = min(max(denominator, 5), 8)
cr = denominator - 4
current = self.read_register(REG_MODEM_CONFIG_1) & 0xF1
self.write_register(REG_MODEM_CONFIG_1, current | (cr << 1))
def set_preamble_length(self, length):
"""
Set the preamble length.
:param length: Preamble length.
"""
self.write_register(REG_PREAMBLE_MSB, (length >> 8) & 0xFF)
self.write_register(REG_PREAMBLE_LSB, length & 0xFF)
def enable_crc(self, enable_crc):
"""
Enable or disable CRC checking.
:param enable_crc: Boolean flag.
"""
modem_config_2 = self.read_register(REG_MODEM_CONFIG_2)
if enable_crc:
config = modem_config_2 | 0x04
else:
config = modem_config_2 & 0xFB
self.write_register(REG_MODEM_CONFIG_2, config)
def invert_iq(self, invert):
"""
Invert the IQ signals.
:param invert: Boolean flag.
"""
self.parameters["invert_IQ"] = invert
current = self.read_register(REG_INVERTIQ)
if invert:
new_val = (current & RFLR_INVERTIQ_TX_MASK & RFLR_INVERTIQ_RX_MASK) | RFLR_INVERTIQ_RX_ON | RFLR_INVERTIQ_TX_ON
self.write_register(REG_INVERTIQ, new_val)
self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_ON)
else:
new_val = (current & RFLR_INVERTIQ_TX_MASK & RFLR_INVERTIQ_RX_MASK) | RFLR_INVERTIQ_RX_OFF | RFLR_INVERTIQ_TX_OFF
self.write_register(REG_INVERTIQ, new_val)
self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF)
def set_sync_word(self, sw):
"""
Set the synchronization word.
:param sw: Sync word.
"""
self.write_register(REG_SYNC_WORD, sw)
def set_implicit_header(self, implicit):
"""
Set the module to use implicit or explicit header mode.
:param implicit: Boolean flag.
"""
if self.implicit_header_mode != implicit:
self.implicit_header_mode = implicit
modem_config_1 = self.read_register(REG_MODEM_CONFIG_1)
if implicit:
config = modem_config_1 | 0x01
else:
config = modem_config_1 & 0xFE
self.write_register(REG_MODEM_CONFIG_1, config)
# ---------------------------
# Low-Level SPI Methods
# ---------------------------
def read_register(self, address):
"""
Read a byte from the specified register.
:param address: Register address.
:return: Value read.
"""
response = self.transfer(address & 0x7F)
return int.from_bytes(response, 'big')
def write_register(self, address, value):
"""
Write a byte to the specified register.
:param address: Register address.
:param value: Value to write.
"""
self.transfer(address | 0x80, value)
def transfer(self, address, value=0x00):
"""
Perform an SPI transfer.
:param address: Register address.
:param value: Byte to write.
:return: Response as a bytearray.
"""
response = bytearray(1)
self.pin_ss.value(0)
# Write the register address
self.spi.write(bytes([address]))
# Write the value and simultaneously read the response
self.spi.write_readinto(bytes([value]), response)
self.pin_ss.value(1)
return response
def dump_registers(self):
"""
Dump the first 128 registers for debugging purposes.
"""
for i in range(128):
print("0x{:02X}: {:02X}".format(i, self.read_register(i)), end="")
if (i+1) % 4 == 0:
print()
else:
print(" | ", end="")
def collect_garbage(self):
"""
Run garbage collection.
"""
gc.collect()
ssd1308.py
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
led_driver.py
from machine import Pin, SoftI2C
import ssd1308
from time import sleep
# Настройка дисплея
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
oled = ssd1308.SSD1306_I2C(128, 64, i2c, addr=0x3C)
def print_like_console(text, x=0, y=0, delay=0.05):
oled.fill(0)
for i in range(len(text)):
oled.text(text[i], x + i * 6, y) # 6 пикселей на символ
oled.show()
sleep(delay)
def print_multiline_console(text_lines, delay=0.03):
oled.fill(0)
line_height = 9 # чуть больше 8, чтобы строки не слипались
for idx, line in enumerate(text_lines):
oled.text(line, 0, idx * line_height)
oled.show()
sleep(delay)
#print_like_console('hhhhhh', x=0, y=0, delay=0.05)
main.py
Настройки меняем в коде:
# Диапазон для сканирования (в MHz)
SCAN_RANGE = [410, 421] # частота от и до
SCAN_STEP = 0.1 # MHz шаг сканирования
SCAN_TIME = 1.0 # время сканирования в секундах
# lora_scanner.py - LoRa Channel Scanner
import machine
from machine import SPI, Pin
import utime
from ulora import ULoRa
from led_driver import print_like_console, print_multiline_console
# Конфигурация LoRa
LORA_PARAMS = {
"frequency": 433000000, # Начальная частота
"tx_power_level": 18,
"signal_bandwidth": 125e3,
"spreading_factor": 12,
"coding_rate": 8,
"preamble_length": 12,
"implicitHeader": False,
"sync_word": 0x12,
"enable_CRC": True,
"invert_IQ": False,
}
LORA_PINS = {
"ss": 5,
"reset": 14,
"dio0": 2,
}
# Диапазон для сканирования (в MHz)
SCAN_RANGE = [410, 421] # MHz
SCAN_STEP = 0.1 # MHz
SCAN_TIME = 1.0 # seconds per frequency
spi = SPI(1, baudrate=10000000, polarity=0, phase=0,
sck=Pin(18), mosi=Pin(23), miso=Pin(19))
lora = ULoRa(spi, LORA_PINS, LORA_PARAMS)
led = Pin(25, Pin.OUT)
def blink_led(times=1, delay=100):
for _ in range(times):
led.value(1)
utime.sleep_ms(delay)
led.value(0)
utime.sleep_ms(delay)
def scan_channels():
"""Сканирование диапазона частот и анализ загруженности"""
results = []
# Генерация списка частот для сканирования (в герцах)
start_freq = int(SCAN_RANGE[0] * 1000000) # Convert to Hz
end_freq = int(SCAN_RANGE[1] * 1000000)
step = int(SCAN_STEP * 1000000)
frequencies = list(range(start_freq, end_freq + 1, step))
print(f"Scanning {len(frequencies)} channels...")
print_multiline_console(["Scanning...", f"0/{len(frequencies)}", ""], 0)
for i, freq in enumerate(frequencies):
# Установка новой частоты
lora.set_frequency(freq)
# Обновление информации на дисплее
progress = f"{i+1}/{len(frequencies)}"
freq_info = f"{freq/1000000:.1f}MHz"
print_multiline_console(["Scanning...", progress, freq_info], 0)
packet_count = 0
rssi_values = []
start_time = utime.ticks_ms()
# Сбор статистики за указанное время
while utime.ticks_diff(utime.ticks_ms(), start_time) < SCAN_TIME * 1000:
if lora.received_packet():
lora.read_payload() # Clear buffer
packet_count += 1
rssi_values.append(lora.packet_rssi())
utime.sleep_ms(10)
# Расчет средней мощности сигнала
avg_rssi = sum(rssi_values) / len(rssi_values) if rssi_values else -120
results.append({
'frequency': freq,
'packets': packet_count,
'avg_rssi': avg_rssi,
'activity': packet_count / SCAN_TIME # Пакетов в секунду
})
blink_led(1, 50) # Индикация прогресса
return results
def analyze_results(results):
"""Анализ и отображение результатов сканирования"""
# Сортировка по активности
sorted_results = sorted(results, key=lambda x: x['activity'], reverse=True)
print("\n=== SCAN RESULTS ===")
print("Freq(MHz)\tPkts/s\tRSSI(dBm)\tStatus")
for res in sorted_results[:10]: # Показать топ-10 каналов
freq_mhz = res['frequency'] / 1000000
status = "HIGH TRAFFIC" if res['activity'] > 5 else "MODERATE" if res['activity'] > 1 else "LOW"
print(f"{freq_mhz:.1f}\t\t{res['activity']:.1f}\t{res['avg_rssi']:.1f}\t\t{status}")
# Поиск наименее загруженного канала
best_channel = min(results, key=lambda x: x['activity'])
best_freq = best_channel['frequency'] / 1000000
print(f"\nRecommended channel: {best_freq:.1f} MHz")
print(f"Activity: {best_channel['activity']:.1f} packets/sec")
print(f"Signal strength: {best_channel['avg_rssi']:.1f} dBm")
# Вывод на дисплей
display_lines = [
"Scan Complete!",
f"Best: {best_freq:.1f}MHz",
f"Activity: {best_channel['activity']:.1f}p/s",
f"RSSI: {best_channel['avg_rssi']:.1f}dBm"
]
print_multiline_console(display_lines, 0.05)
# Основной цикл
try:
print("Starting LoRa channel scanner...")
scan_results = scan_channels()
analyze_results(scan_results)
except KeyboardInterrupt:
print("\nScan interrupted")
except Exception as e:
print(f"Error: {e}")
finally:
lora.sleep()
print("Scanner stopped")

Комментарии
Отправить комментарий