Telegram bot esp32 control

Управление светодиодом через ТГ бота. 

Кнопки ВКЛ , ВЫКЛ и Статус.
Можно подключить реле и управлять лампочкой, розеткой и т д. 
 
 

 


Для нормальной работы нужно добавить свои данные SSID , пароль , токен тг бота и ID  пользователя.
Токен ТГ бота получаем у бота  @BotFather



main.py     

# reliable_bot.py
import network, time, urequests
from machine import Pin
import socket

WIFI_SSID = ""
WIFI_PASS = ""
BOT_TOKEN = ""
CHAT_ID = 

BASE = "https://api.telegram.org/bot" + BOT_TOKEN
led = Pin(2, Pin.OUT)
led.off()
offset = 0

# WiFi helper
wlan = network.WLAN(network.STA_IF)

def wifi_connect():
    wlan.active(True)
    if not wlan.isconnected():
        print("Connecting WiFi...")
        wlan.connect(WIFI_SSID, WIFI_PASS)
        t0 = time.time()
        while not wlan.isconnected():
            # если не подключились за 20s — перезапуск попытки
            if time.time() - t0 > 20:
                print("Still not connected, retrying...")
                wlan.disconnect()
                wlan.connect(WIFI_SSID, WIFI_PASS)
                t0 = time.time()
            time.sleep(0.5)
    print("WiFi connected:", wlan.ifconfig())

def tcp_check(host="api.telegram.org", port=443, timeout=5):
    try:
        ai = socket.getaddrinfo(host, port)[0][-1]
        s = socket.socket()
        s.settimeout(timeout)
        s.connect(ai)
        s.close()
        return True
    except Exception as e:
        print("tcp_check failed:", e)
        return False

# send keyboard
def send_kb():
    urequests.post(
        BASE + "/sendMessage",
        json={
            "chat_id": CHAT_ID,
            "text": "LED CONTROL",
            "reply_markup": {
                "inline_keyboard": [
                    [{"text": "ON", "callback_data": "on"},
                     {"text": "OFF", "callback_data": "off"},
                     {"text": "STATUS", "callback_data": "status"}]
                ]
            }
        }
    ).close()
    
    
def send_mess(text):
    t = f'{text}'
    urequests.post(
        BASE + "/sendMessage",
        json={
            "chat_id": CHAT_ID,
            "text": t,
            "reply_markup": {
                "inline_keyboard": [
                    [{"text": "ON", "callback_data": "on"},
                     {"text": "OFF", "callback_data": "off"},
                     {"text": "STATUS", "callback_data": "status"}]
                ]
            }
        }
    ).close()
    
    
    

def answer_callback(cb_id,t):
    try:
        r = urequests.post(
            BASE + "/answerCallbackQuery",
            json={"callback_query_id": cb_id}
        )
        r.close()
    except:
        pass   # игнорируем полностью


# START
wifi_connect()

# quick pre-check
if not tcp_check():
    print("Warning: can't reach api.telegram.org — try hotspot or reboot router")

send_kb()

backoff = 1
while True:
    try:
        # если Wi-Fi упал — переподключаем
        if not wlan.isconnected():
            print("WiFi lost, reconnecting...")
            wifi_connect()

        url = BASE + "/getUpdates?timeout=8&offset={}".format(offset)
        r = urequests.get(url)
        data = r.json()
        r.close()

        # reset backoff on success
        backoff = 1

        for u in data.get("result", []):
            offset = u["update_id"] + 1
            if "callback_query" in u:
                cb = u["callback_query"]
                d = cb.get("data")
                if d == "on":
                    led.on()
                    answer_callback(cb["id"], "LED ВКЛ")
                elif d == "off":
                    led.off()
                    answer_callback(cb["id"], "LED ВЫКЛ")
                elif d == "status":
                    st = "ВКЛ" if led.value() else "ВЫКЛ"
                    answer_callback(cb["id"], "LED: " + st)
                    print(f'status {st}')
                    #l = send_message(f'status {st}')
                    #print(l)
                    send_mess(f'status {st}')
                # обновляем клавиатуру (можно убрать, чтобы не спамить)
                #send_kb()
        time.sleep(0.5)

    except OSError as e:
        print("OSError:", e)
        # типично ECONNABORTED/ETIMEDOUT — делаем экспоненциальный бэкофф
        time.sleep(backoff)
        backoff = min(30, backoff * 2)
        # попробуем восстановить TCP before next iteration
        if not tcp_check():
            print("TCP still failing, reconnecting WiFi and waiting")
            try:
                wlan.disconnect()
            except:
                pass
            wifi_connect()

    except ValueError as e:
        # JSON decode error
        print("ValueError (bad json):", e)
        time.sleep(1)

    except Exception as e:
        print("Unexpected error:", e)
        time.sleep(3)




Комментарии

Популярные сообщения из этого блога

Установка micropython на ESP32

LORA Приемник и передатчик

LORA Upgrade E32 400M30S