LoRa Сканер частот

 Понадобится:

  • ESP32
  • LoRa-02/SX1278/433MHz
  • SSD1306 OLED

 

 


 

 ulora.py


"""
ULoRa: A lightweight library for the SX127x LoRa module.
This library provides functionalities for configuring and communicating with the SX127x.
It supports functions for setting frequency, TX power, bandwidth, spreading factor, etc.,
as well as packet transmission and reception.

"""

import gc
import machine
from machine import SPI, Pin
from utime import ticks_ms, sleep_ms  # ticks_ms and sleep_ms imported from utime
from micropython import const

# ============================================================================
# SX127x Register Definitions
# ============================================================================
REG_FIFO                = const(0x00)
REG_OP_MODE             = const(0x01)
REG_FRF_MSB             = const(0x06)
REG_FRF_MID             = const(0x07)
REG_FRF_LSB             = const(0x08)
REG_PA_CONFIG           = const(0x09)
REG_LNA                 = const(0x0C)
REG_FIFO_ADDR_PTR       = const(0x0D)
REG_FIFO_TX_BASE_ADDR   = const(0x0E)
REG_FIFO_RX_BASE_ADDR   = const(0x0F)
REG_FIFO_RX_CURRENT_ADDR= const(0x10)
REG_IRQ_FLAGS_MASK      = const(0x11)
REG_IRQ_FLAGS           = const(0x12)
REG_RX_NB_BYTES         = const(0x13)
REG_PKT_SNR_VALUE       = const(0x19)
REG_PKT_RSSI_VALUE      = const(0x1A)
REG_MODEM_CONFIG_1      = const(0x1D)
REG_MODEM_CONFIG_2      = const(0x1E)
REG_PREAMBLE_MSB        = const(0x20)
REG_PREAMBLE_LSB        = const(0x21)
REG_PAYLOAD_LENGTH      = const(0x22)
REG_FIFO_RX_BYTE_ADDR   = const(0x25)
REG_MODEM_CONFIG_3      = const(0x26)
REG_RSSI_WIDEBAND       = const(0x2C)
REG_DETECTION_OPTIMIZE  = const(0x31)
REG_DETECTION_THRESHOLD = const(0x37)
REG_SYNC_WORD           = const(0x39)
REG_DIO_MAPPING_1       = const(0x40)
REG_VERSION             = const(0x42)
REG_INVERTIQ            = const(0x33)
REG_INVERTIQ2           = const(0x3B)

# ============================================================================
# SX127x Mode and Power Settings
# ============================================================================
MODE_LONG_RANGE_MODE    = const(0x80)
MODE_SLEEP              = const(0x00)
MODE_STDBY              = const(0x01)
MODE_TX                 = const(0x03)
MODE_RX_CONTINUOUS      = const(0x05)
MODE_RX_SINGLE          = const(0x06)

PA_OUTPUT_RFO_PIN       = const(0)
PA_OUTPUT_PA_BOOST_PIN  = const(0x01)
PA_BOOST                = const(0x80)

# ============================================================================
# IRQ Masks
# ============================================================================
IRQ_TX_DONE_MASK        = const(0x08)
IRQ_PAYLOAD_CRC_ERROR_MASK = const(0x20)
IRQ_RX_DONE_MASK        = const(0x40)
IRQ_RX_TIME_OUT_MASK    = const(0x80)

# ============================================================================
# IQ Inversion Constants
# ============================================================================
RFLR_INVERTIQ_RX_MASK   = const(0xBF)
RFLR_INVERTIQ_RX_OFF    = const(0x00)
RFLR_INVERTIQ_RX_ON     = const(0x40)
RFLR_INVERTIQ_TX_MASK   = const(0xFE)
RFLR_INVERTIQ_TX_OFF    = const(0x01)
RFLR_INVERTIQ_TX_ON     = const(0x00)
RFLR_INVERTIQ2_ON       = const(0x19)
RFLR_INVERTIQ2_OFF      = const(0x1D)

# ============================================================================
# Other Definitions
# ============================================================================
MAX_PKT_LENGTH = const(255)
FifoTxBaseAddr = const(0x00)
FifoRxBaseAddr = const(0x00)

# ============================================================================
# Default LoRa Parameters
# ============================================================================
DEFAULT_PARAMETERS = {
    "frequency": 433000000,
    "frequency_offset": 0,
    "tx_power_level": 10,
    "signal_bandwidth": 125e3,
    "spreading_factor": 9,
    "coding_rate": 5,
    "preamble_length": 8,
    "implicitHeader": False,
    "sync_word": 0x12,
    "enable_CRC": True,
    "invert_IQ": False,
}

# ============================================================================
# ULoRa Class Definition
# ============================================================================
class ULoRa:
    """
    ULoRa class to interface with the SX127x LoRa module.
    """
    def __init__(self, spi, pins, parameters=None):
        """
        Initialize the LoRa module.
        
        :param spi: Initialized SPI object.
        :param pins: Dictionary with pin mappings, e.g.,
                     {"ss": , "reset": , "dio0": }.
        :param parameters: (Optional) Dictionary with LoRa configuration parameters.
        """
        self.spi = spi
        self.pins = pins
        self.parameters = DEFAULT_PARAMETERS.copy()
        if parameters:
            self.parameters.update(parameters)
        
        # Setup slave select (CS) pin
        self.pin_ss = Pin(self.pins["ss"], Pin.OUT)
        
        # Setup reset pin if provided and perform a hardware reset
        if "reset" in self.pins:
            self.pin_reset = Pin(self.pins["reset"], Pin.OUT)
            self.reset_module()
        else:
            self.pin_reset = None
        
        self.lock = False
        self.implicit_header_mode = None
        
        # Check LoRa module version
        version = None
        for _ in range(5):
            version = self.read_register(REG_VERSION)
            if version:
                break
        print("SX127x Version: {}".format(version))
        if version != 0x12:  # Expected version is 0x12 (18 in decimal)
            print("Bad LoRa Connection! Version: {}".format(version))
            machine.reset()
        else:
            print("LoRa Connection OK! Version: {}".format(version))
        
        # Put module in sleep mode for configuration
        self.sleep()
        
        # Configure LoRa parameters
        self.set_frequency(self.parameters["frequency"])
        self.set_signal_bandwidth(self.parameters["signal_bandwidth"])
        
        # Enable LNA boost and auto AGC
        self.write_register(REG_LNA, self.read_register(REG_LNA) | 0x03)
        self.write_register(REG_MODEM_CONFIG_3, 0x04)
        
        self.set_tx_power(self.parameters["tx_power_level"])
        self.set_implicit_header(self.parameters["implicitHeader"])
        self.set_spreading_factor(self.parameters["spreading_factor"])
        self.set_coding_rate(self.parameters["coding_rate"])
        self.set_preamble_length(self.parameters["preamble_length"])
        self.set_sync_word(self.parameters["sync_word"])
        self.enable_crc(self.parameters["enable_CRC"])
        self.invert_iq(self.parameters["invert_IQ"])
        
        # Enable LowDataRateOptimize if symbol duration > 16ms
        bw = self.parameters["signal_bandwidth"]
        sf = self.parameters["spreading_factor"]
        if (1000 / bw / (2 ** sf)) > 16:
            self.write_register(REG_MODEM_CONFIG_3, self.read_register(REG_MODEM_CONFIG_3) | 0x08)
        
        # Set FIFO base addresses
        self.write_register(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
        self.write_register(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
        
        self.standby()
    
    def reset_module(self):
        """
        Perform a hardware reset of the LoRa module using the reset pin.
        """
        if self.pin_reset is None:
            return
        # Drive reset pin low for 100ms then high
        self.pin_reset.value(0)
        sleep_ms(100)
        self.pin_reset.value(1)
        sleep_ms(100)
    
    # ---------------------------
    # Packet Transmission Methods
    # ---------------------------
    def begin_packet(self, implicit_header=None):
        """
        Prepare the module for packet transmission.
        
        :param implicit_header: (Optional) Boolean to override current header mode.
        """
        self.standby()
        if implicit_header is not None:
            self.set_implicit_header(implicit_header)
        # Reset FIFO pointer and payload length
        self.write_register(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
        self.write_register(REG_PAYLOAD_LENGTH, 0)
    
    def write(self, buffer):
        """
        Write data to the LoRa FIFO.
        
        :param buffer: Data bytes (or bytearray) to be sent.
        :return: Number of bytes written.
        """
        current_length = self.read_register(REG_PAYLOAD_LENGTH)
        size = len(buffer)
        # Ensure packet does not exceed maximum allowed length
        size = min(size, MAX_PKT_LENGTH - FifoTxBaseAddr - current_length)
        for byte in buffer:
            self.write_register(REG_FIFO, byte)
        # Update payload length
        self.write_register(REG_PAYLOAD_LENGTH, current_length + size)
        return size
    
    def end_packet(self):
        """
        Transmit the packet and wait until transmission is complete.
        """
        # Set module to TX mode
        self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
        # Wait until TX_DONE flag is set
        while (self.read_register(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0:
            pass
        # Clear TX_DONE flag
        self.write_register(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
    
    def println(self, message, implicit_header=None, repeat=1):
        """
        Transmit a text message.
        
        :param message: String message to transmit.
        :param implicit_header: (Optional) Boolean to override header mode.
        :param repeat: Number of times to send the message.
        """
        if isinstance(message, str):
            message = message.encode()
        self.begin_packet(implicit_header)
        self.write(message)
        for _ in range(repeat):
            self.end_packet()
        self.collect_garbage()
    
    # ---------------------------
    # Packet Reception Methods
    # ---------------------------
    def receive(self, size=0):
        """
        Set the module to continuous receive mode.
        
        :param size: Expected payload size. If >0, uses implicit header mode.
        """
        self.set_implicit_header(size > 0)
        if size > 0:
            self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF)
        self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS)
    
    def listen(self, timeout=1000):
        """
        Listen for an incoming packet for a specified timeout.
        
        :param timeout: Timeout in milliseconds.
        :return: Received payload as bytes, or None if timeout occurs.
        """
        self.receive()
        start = ticks_ms()
        while True:
            if self.received_packet():
                return self.read_payload()
            if ticks_ms() - start > timeout:
                return None
    
    def received_packet(self, size=0):
        """
        Check if a packet has been received.
        
        :param size: Expected payload size. If >0, uses implicit header mode.
        :return: True if a packet is received, otherwise False.
        """
        irq_flags = self.get_irq_flags()
        self.set_implicit_header(size > 0)
        if size > 0:
            self.write_register(REG_PAYLOAD_LENGTH, size & 0xFF)
        if irq_flags == IRQ_RX_DONE_MASK:
            return True
        else:
            # If not in single RX mode, reset FIFO pointer and enter single RX mode
            if self.read_register(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE):
                self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
                self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE)
        return False
    
    def read_payload(self):
        """
        Read the received payload from the FIFO.
        
        :return: Payload data as bytes.
        """
        # Set FIFO pointer to the current RX address
        self.write_register(REG_FIFO_ADDR_PTR, self.read_register(REG_FIFO_RX_CURRENT_ADDR))
        # Determine payload length based on header mode
        if self.implicit_header_mode:
            packet_length = self.read_register(REG_PAYLOAD_LENGTH)
        else:
            packet_length = self.read_register(REG_RX_NB_BYTES)
        payload = bytearray()
        for _ in range(packet_length):
            payload.append(self.read_register(REG_FIFO))
        self.collect_garbage()
        return bytes(payload)
    
    def get_irq_flags(self):
        """
        Retrieve and clear the IRQ flags.
        
        :return: IRQ flags value.
        """
        irq_flags = self.read_register(REG_IRQ_FLAGS)
        self.write_register(REG_IRQ_FLAGS, irq_flags)
        return irq_flags
    
    def packet_rssi(self, high_frequency=True):
        """
        Get the RSSI value of the last received packet.
        
        :param high_frequency: Boolean flag; if True, uses high frequency offset.
        :return: Adjusted RSSI value.
        """
        rssi = self.read_register(REG_PKT_RSSI_VALUE)
        return rssi - (157 if high_frequency else 164)
    
    def packet_snr(self):
        """
        Get the SNR (Signal-to-Noise Ratio) of the last packet.
        
        :return: SNR value.
        """
        return self.read_register(REG_PKT_SNR_VALUE) * 0.25
    
    # ---------------------------
    # Module Mode Methods
    # ---------------------------
    def standby(self):
        """
        Set the module to standby mode.
        """
        self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
    
    def sleep(self):
        """
        Put the module into sleep mode.
        """
        self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
    
    # ---------------------------
    # Configuration Methods
    # ---------------------------
    def set_tx_power(self, level, output_pin=PA_OUTPUT_PA_BOOST_PIN):
        """
        Set the transmission power level.
        
        :param level: Power level.
        :param output_pin: PA output type (PA_OUTPUT_RFO_PIN or PA_OUTPUT_PA_BOOST_PIN).
        """
        self.parameters["tx_power_level"] = level
        if output_pin == PA_OUTPUT_RFO_PIN:
            level = min(max(level, 0), 14)
            self.write_register(REG_PA_CONFIG, 0x70 | level)
        else:
            level = min(max(level, 2), 17)
            self.write_register(REG_PA_CONFIG, PA_BOOST | (level - 2))
    
    def set_frequency(self, frequency):
        """
        Set the operating frequency.
        
        :param frequency: Frequency in Hz.
        """
        self.parameters["frequency"] = frequency
        frequency += self.parameters["frequency_offset"]
        frf = (frequency << 19) // 32000000
        self.write_register(REG_FRF_MSB, (frf >> 16) & 0xFF)
        self.write_register(REG_FRF_MID, (frf >> 8) & 0xFF)
        self.write_register(REG_FRF_LSB, frf & 0xFF)
    
    def set_spreading_factor(self, sf):
        """
        Set the spreading factor (6 to 12).
        
        :param sf: Spreading factor.
        """
        sf = min(max(sf, 6), 12)
        if sf == 6:
            self.write_register(REG_DETECTION_OPTIMIZE, 0xC5)
            self.write_register(REG_DETECTION_THRESHOLD, 0x0C)
        else:
            self.write_register(REG_DETECTION_OPTIMIZE, 0xC3)
            self.write_register(REG_DETECTION_THRESHOLD, 0x0A)
        current = self.read_register(REG_MODEM_CONFIG_2) & 0x0F
        self.write_register(REG_MODEM_CONFIG_2, current | ((sf << 4) & 0xF0))
    
    def set_signal_bandwidth(self, sbw):
        """
        Set the signal bandwidth.
        
        :param sbw: Bandwidth in Hz.
        """
        # Predefined bandwidth bins
        bins = (7800, 10400, 15600, 20800, 31250, 41700, 62500, 125000, 250000)
        bw_index = 7  # Default to 125 kHz
        if sbw < 10:
            bw_index = int(sbw)
        else:
            for i, bw in enumerate(bins):
                if sbw <= bw:
                    bw_index = i
                    break
        current = self.read_register(REG_MODEM_CONFIG_1) & 0x0F
        self.write_register(REG_MODEM_CONFIG_1, current | (bw_index << 4))
    
    def set_coding_rate(self, denominator):
        """
        Set the coding rate.
        
        :param denominator: Denominator (between 5 and 8).
        """
        denominator = min(max(denominator, 5), 8)
        cr = denominator - 4
        current = self.read_register(REG_MODEM_CONFIG_1) & 0xF1
        self.write_register(REG_MODEM_CONFIG_1, current | (cr << 1))
    
    def set_preamble_length(self, length):
        """
        Set the preamble length.
        
        :param length: Preamble length.
        """
        self.write_register(REG_PREAMBLE_MSB, (length >> 8) & 0xFF)
        self.write_register(REG_PREAMBLE_LSB, length & 0xFF)
    
    def enable_crc(self, enable_crc):
        """
        Enable or disable CRC checking.
        
        :param enable_crc: Boolean flag.
        """
        modem_config_2 = self.read_register(REG_MODEM_CONFIG_2)
        if enable_crc:
            config = modem_config_2 | 0x04
        else:
            config = modem_config_2 & 0xFB
        self.write_register(REG_MODEM_CONFIG_2, config)
    
    def invert_iq(self, invert):
        """
        Invert the IQ signals.
        
        :param invert: Boolean flag.
        """
        self.parameters["invert_IQ"] = invert
        current = self.read_register(REG_INVERTIQ)
        if invert:
            new_val = (current & RFLR_INVERTIQ_TX_MASK & RFLR_INVERTIQ_RX_MASK) | RFLR_INVERTIQ_RX_ON | RFLR_INVERTIQ_TX_ON
            self.write_register(REG_INVERTIQ, new_val)
            self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_ON)
        else:
            new_val = (current & RFLR_INVERTIQ_TX_MASK & RFLR_INVERTIQ_RX_MASK) | RFLR_INVERTIQ_RX_OFF | RFLR_INVERTIQ_TX_OFF
            self.write_register(REG_INVERTIQ, new_val)
            self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF)
    
    def set_sync_word(self, sw):
        """
        Set the synchronization word.
        
        :param sw: Sync word.
        """
        self.write_register(REG_SYNC_WORD, sw)
    
    def set_implicit_header(self, implicit):
        """
        Set the module to use implicit or explicit header mode.
        
        :param implicit: Boolean flag.
        """
        if self.implicit_header_mode != implicit:
            self.implicit_header_mode = implicit
            modem_config_1 = self.read_register(REG_MODEM_CONFIG_1)
            if implicit:
                config = modem_config_1 | 0x01
            else:
                config = modem_config_1 & 0xFE
            self.write_register(REG_MODEM_CONFIG_1, config)
    
    # ---------------------------
    # Low-Level SPI Methods
    # ---------------------------
    def read_register(self, address):
        """
        Read a byte from the specified register.
        
        :param address: Register address.
        :return: Value read.
        """
        response = self.transfer(address & 0x7F)
        return int.from_bytes(response, 'big')
    
    def write_register(self, address, value):
        """
        Write a byte to the specified register.
        
        :param address: Register address.
        :param value: Value to write.
        """
        self.transfer(address | 0x80, value)
    
    def transfer(self, address, value=0x00):
        """
        Perform an SPI transfer.
        
        :param address: Register address.
        :param value: Byte to write.
        :return: Response as a bytearray.
        """
        response = bytearray(1)
        self.pin_ss.value(0)
        # Write the register address
        self.spi.write(bytes([address]))
        # Write the value and simultaneously read the response
        self.spi.write_readinto(bytes([value]), response)
        self.pin_ss.value(1)
        return response
    
    def dump_registers(self):
        """
        Dump the first 128 registers for debugging purposes.
        """
        for i in range(128):
            print("0x{:02X}: {:02X}".format(i, self.read_register(i)), end="")
            if (i+1) % 4 == 0:
                print()
            else:
                print(" | ", end="")
    
    def collect_garbage(self):
        """
        Run garbage collection.
        """
        gc.collect()

 ssd1308.py


# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit

import time
import framebuf

# register definitions
SET_CONTRAST        = const(0x81)
SET_ENTIRE_ON       = const(0xa4)
SET_NORM_INV        = const(0xa6)
SET_DISP            = const(0xae)
SET_MEM_ADDR        = const(0x20)
SET_COL_ADDR        = const(0x21)
SET_PAGE_ADDR       = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP       = const(0xa0)
SET_MUX_RATIO       = const(0xa8)
SET_COM_OUT_DIR     = const(0xc0)
SET_DISP_OFFSET     = const(0xd3)
SET_COM_PIN_CFG     = const(0xda)
SET_DISP_CLK_DIV    = const(0xd5)
SET_PRECHARGE       = const(0xd9)
SET_VCOM_DESEL      = const(0xdb)
SET_CHARGE_PUMP     = const(0x8d)


class SSD1306:
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        # Note the subclass must initialize self.framebuf to a framebuffer.
        # This is necessary because the underlying data buffer is different
        # between I2C and SPI implementations (I2C needs an extra byte).
        self.poweron()
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP | 0x00, # off
            # address setting
            SET_MEM_ADDR, 0x00, # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
            SET_MUX_RATIO, self.height - 1,
            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
            SET_DISP_OFFSET, 0x00,
            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV, 0x80,
            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
            SET_VCOM_DESEL, 0x30, # 0.83*Vcc
            # display
            SET_CONTRAST, 0xff, # maximum
            SET_ENTIRE_ON, # output follows RAM contents
            SET_NORM_INV, # not inverted
            # charge pump
            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01): # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_framebuf()

    def fill(self, col):
        self.framebuf.fill(col)

    def pixel(self, x, y, col):
        self.framebuf.pixel(x, y, col)

    def scroll(self, dx, dy):
        self.framebuf.scroll(dx, dy)

    def text(self, string, x, y, col=1):
        self.framebuf.text(string, x, y, col)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        # Add an extra byte to the data buffer to hold an I2C data/command byte
        # to use hardware-compatible I2C transactions.  A memoryview of the
        # buffer is used to mask this byte from the framebuffer operations
        # (without a major memory hit as memoryview doesn't copy to a separate
        # buffer).
        self.buffer = bytearray(((height // 8) * width) + 1)
        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1
        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80 # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_framebuf(self):
        # Blast out the frame buffer using a single I2C transaction to support
        # hardware I2C interfaces.
        self.i2c.writeto(self.addr, self.buffer)

    def poweron(self):
        pass


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        self.buffer = bytearray((height // 8) * width)
        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.low()
        self.cs.low()
        self.spi.write(bytearray([cmd]))
        self.cs.high()

    def write_framebuf(self):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.high()
        self.cs.low()
        self.spi.write(self.buffer)
        self.cs.high()

    def poweron(self):
        self.res.high()
        time.sleep_ms(1)
        self.res.low()
        time.sleep_ms(10)
        self.res.high()

 led_driver.py


 


from machine import Pin, SoftI2C
import ssd1308
from time import sleep

# Настройка дисплея
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
oled = ssd1308.SSD1306_I2C(128, 64, i2c, addr=0x3C)

def print_like_console(text, x=0, y=0, delay=0.05):
    oled.fill(0)
    for i in range(len(text)):
        oled.text(text[i], x + i * 6, y)  # 6 пикселей на символ
        oled.show()
        sleep(delay)
        
        
def print_multiline_console(text_lines, delay=0.03):
    oled.fill(0)
    line_height = 9  # чуть больше 8, чтобы строки не слипались
    for idx, line in enumerate(text_lines):
        oled.text(line, 0, idx * line_height)
        oled.show()
        sleep(delay)
        
        
#print_like_console('hhhhhh', x=0, y=0, delay=0.05)

 main.py

 Настройки меняем в коде:

# Диапазон для сканирования (в MHz)
SCAN_RANGE = [410, 421]  # частота от и до
SCAN_STEP = 0.1  # MHz шаг сканирования
SCAN_TIME = 1.0  # время сканирования в секундах
 

# lora_scanner.py - LoRa Channel Scanner
import machine
from machine import SPI, Pin
import utime
from ulora import ULoRa
from led_driver import print_like_console, print_multiline_console

# Конфигурация LoRa
LORA_PARAMS = {
    "frequency": 433000000,  # Начальная частота
    "tx_power_level": 18,
    "signal_bandwidth": 125e3,
    "spreading_factor": 12,
    "coding_rate": 8,
    "preamble_length": 12,
    "implicitHeader": False,
    "sync_word": 0x12,
    "enable_CRC": True,
    "invert_IQ": False,
}

LORA_PINS = {
    "ss": 5,
    "reset": 14,
    "dio0": 2,
}

# Диапазон для сканирования (в MHz)
SCAN_RANGE = [410, 421]  # MHz
SCAN_STEP = 0.1  # MHz
SCAN_TIME = 1.0  # seconds per frequency

spi = SPI(1, baudrate=10000000, polarity=0, phase=0,
          sck=Pin(18), mosi=Pin(23), miso=Pin(19))
lora = ULoRa(spi, LORA_PINS, LORA_PARAMS)
led = Pin(25, Pin.OUT)

def blink_led(times=1, delay=100):
    for _ in range(times):
        led.value(1)
        utime.sleep_ms(delay)
        led.value(0)
        utime.sleep_ms(delay)

def scan_channels():
    """Сканирование диапазона частот и анализ загруженности"""
    results = []
    
    # Генерация списка частот для сканирования (в герцах)
    start_freq = int(SCAN_RANGE[0] * 1000000)  # Convert to Hz
    end_freq = int(SCAN_RANGE[1] * 1000000)
    step = int(SCAN_STEP * 1000000)
    
    frequencies = list(range(start_freq, end_freq + 1, step))

    print(f"Scanning {len(frequencies)} channels...")
    print_multiline_console(["Scanning...", f"0/{len(frequencies)}", ""], 0)

    for i, freq in enumerate(frequencies):
        # Установка новой частоты
        lora.set_frequency(freq)
        
        # Обновление информации на дисплее
        progress = f"{i+1}/{len(frequencies)}"
        freq_info = f"{freq/1000000:.1f}MHz"
        print_multiline_console(["Scanning...", progress, freq_info], 0)
        
        packet_count = 0
        rssi_values = []
        start_time = utime.ticks_ms()
        
        # Сбор статистики за указанное время
        while utime.ticks_diff(utime.ticks_ms(), start_time) < SCAN_TIME * 1000:
            if lora.received_packet():
                lora.read_payload()  # Clear buffer
                packet_count += 1
                rssi_values.append(lora.packet_rssi())
            utime.sleep_ms(10)
        
        # Расчет средней мощности сигнала
        avg_rssi = sum(rssi_values) / len(rssi_values) if rssi_values else -120
        
        results.append({
            'frequency': freq,
            'packets': packet_count,
            'avg_rssi': avg_rssi,
            'activity': packet_count / SCAN_TIME  # Пакетов в секунду
        })
        
        blink_led(1, 50)  # Индикация прогресса

    return results

def analyze_results(results):
    """Анализ и отображение результатов сканирования"""
    # Сортировка по активности
    sorted_results = sorted(results, key=lambda x: x['activity'], reverse=True)
    
    print("\n=== SCAN RESULTS ===")
    print("Freq(MHz)\tPkts/s\tRSSI(dBm)\tStatus")
    
    for res in sorted_results[:10]:  # Показать топ-10 каналов
        freq_mhz = res['frequency'] / 1000000
        status = "HIGH TRAFFIC" if res['activity'] > 5 else "MODERATE" if res['activity'] > 1 else "LOW"
        
        print(f"{freq_mhz:.1f}\t\t{res['activity']:.1f}\t{res['avg_rssi']:.1f}\t\t{status}")
    
    # Поиск наименее загруженного канала
    best_channel = min(results, key=lambda x: x['activity'])
    best_freq = best_channel['frequency'] / 1000000
    
    print(f"\nRecommended channel: {best_freq:.1f} MHz")
    print(f"Activity: {best_channel['activity']:.1f} packets/sec")
    print(f"Signal strength: {best_channel['avg_rssi']:.1f} dBm")
    
    # Вывод на дисплей
    display_lines = [
        "Scan Complete!",
        f"Best: {best_freq:.1f}MHz",
        f"Activity: {best_channel['activity']:.1f}p/s",
        f"RSSI: {best_channel['avg_rssi']:.1f}dBm"
    ]
    print_multiline_console(display_lines, 0.05)

# Основной цикл
try:
    print("Starting LoRa channel scanner...")
    scan_results = scan_channels()
    analyze_results(scan_results)
    
except KeyboardInterrupt:
    print("\nScan interrupted")
except Exception as e:
    print(f"Error: {e}")
finally:
    lora.sleep()
    print("Scanner stopped")

Комментарии

Популярные сообщения из этого блога

Meshtastic ESP32 E22

LORA Upgrade E32 400M30S

Установка micropython на ESP32