Telegram bot esp32 control
main.py
# reliable_bot.py
import network, time, urequests
from machine import Pin
import socket
WIFI_SSID = ""
WIFI_PASS = ""
BOT_TOKEN = ""
CHAT_ID =
BASE = "https://api.telegram.org/bot" + BOT_TOKEN
led = Pin(2, Pin.OUT)
led.off()
offset = 0
# WiFi helper
wlan = network.WLAN(network.STA_IF)
def wifi_connect():
wlan.active(True)
if not wlan.isconnected():
print("Connecting WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
t0 = time.time()
while not wlan.isconnected():
# если не подключились за 20s — перезапуск попытки
if time.time() - t0 > 20:
print("Still not connected, retrying...")
wlan.disconnect()
wlan.connect(WIFI_SSID, WIFI_PASS)
t0 = time.time()
time.sleep(0.5)
print("WiFi connected:", wlan.ifconfig())
def tcp_check(host="api.telegram.org", port=443, timeout=5):
try:
ai = socket.getaddrinfo(host, port)[0][-1]
s = socket.socket()
s.settimeout(timeout)
s.connect(ai)
s.close()
return True
except Exception as e:
print("tcp_check failed:", e)
return False
# send keyboard
def send_kb():
urequests.post(
BASE + "/sendMessage",
json={
"chat_id": CHAT_ID,
"text": "LED CONTROL",
"reply_markup": {
"inline_keyboard": [
[{"text": "ON", "callback_data": "on"},
{"text": "OFF", "callback_data": "off"},
{"text": "STATUS", "callback_data": "status"}]
]
}
}
).close()
def send_mess(text):
t = f'{text}'
urequests.post(
BASE + "/sendMessage",
json={
"chat_id": CHAT_ID,
"text": t,
"reply_markup": {
"inline_keyboard": [
[{"text": "ON", "callback_data": "on"},
{"text": "OFF", "callback_data": "off"},
{"text": "STATUS", "callback_data": "status"}]
]
}
}
).close()
def answer_callback(cb_id,t):
try:
r = urequests.post(
BASE + "/answerCallbackQuery",
json={"callback_query_id": cb_id}
)
r.close()
except:
pass # игнорируем полностью
# START
wifi_connect()
# quick pre-check
if not tcp_check():
print("Warning: can't reach api.telegram.org — try hotspot or reboot router")
send_kb()
backoff = 1
while True:
try:
# если Wi-Fi упал — переподключаем
if not wlan.isconnected():
print("WiFi lost, reconnecting...")
wifi_connect()
url = BASE + "/getUpdates?timeout=8&offset={}".format(offset)
r = urequests.get(url)
data = r.json()
r.close()
# reset backoff on success
backoff = 1
for u in data.get("result", []):
offset = u["update_id"] + 1
if "callback_query" in u:
cb = u["callback_query"]
d = cb.get("data")
if d == "on":
led.on()
answer_callback(cb["id"], "LED ВКЛ")
elif d == "off":
led.off()
answer_callback(cb["id"], "LED ВЫКЛ")
elif d == "status":
st = "ВКЛ" if led.value() else "ВЫКЛ"
answer_callback(cb["id"], "LED: " + st)
print(f'status {st}')
#l = send_message(f'status {st}')
#print(l)
send_mess(f'status {st}')
# обновляем клавиатуру (можно убрать, чтобы не спамить)
#send_kb()
time.sleep(0.5)
except OSError as e:
print("OSError:", e)
# типично ECONNABORTED/ETIMEDOUT — делаем экспоненциальный бэкофф
time.sleep(backoff)
backoff = min(30, backoff * 2)
# попробуем восстановить TCP before next iteration
if not tcp_check():
print("TCP still failing, reconnecting WiFi and waiting")
try:
wlan.disconnect()
except:
pass
wifi_connect()
except ValueError as e:
# JSON decode error
print("ValueError (bad json):", e)
time.sleep(1)
except Exception as e:
print("Unexpected error:", e)
time.sleep(3)


Комментарии
Отправить комментарий