Хотели бы иметь собственного бота, который будет помощником, сигнализацией, пультом от всего, а заодно и кроссплатформенным приложением, работающим на всех ваших девайсах?
За последние пол года я написал около полусотни разных ботов для мессенджера Telegram. Пора немного поделиться хотя бы простыми примерами 🙂
Для тех, кто не любит читать, мы подготовили небольшой видеообзор, демонстрирующий возможности связок, описанных ниже.
Осторожно, будет много букаф!
Зачем мне бот от Telegram?
Телеграм, или в народе «телега», «телек», — очень крутой мессенджер! И не только тем, что он безопасный и удобный, а еще и тем, что у него открытое api. Такие программы мы любим. Это значит, что можно объединить все самое важное и критично-необходимое в одном мессенджере. С точки зрения удобства использования, бот — это всего лишь ваш контакт в телефоне, который умеет «кое-что еще». С практической точки зрения — целый сервер домашней автоматизации (Raspberry pi или любой другой МК) можно подключить к управлению и мониторингу через бота телеграма в вашем телефоне. При этом вы не заботитесь о «белой» IP адресации, или транзитном облаке для вашего трафика. Весь транспорт берет на себя Telegram! На устройстве, где будет написан ваш бот, просто достаточно иметь выход в сеть Интернет, но этим дело не ограничивается. С помощью бота можно соединить кучу областей жизнедеятельности и работы: напоминание покормить кота, вывод квартальных отчетов, управление светом на даче и т.д. и т. п. На что хватит фантазии.
Сегодня мы остановимся на небольшом симбиозе контроля и управления. Для примера нам понадобятся:
- датчик температуры и влажности
- датчик контроля протечек
- датчик движения или объема
- реле 220v c 3-5 вольтовой логикой управления
Сразу оговорюсь, что список может быть не полным или вы можете что-то выкинуть из него. Что подключать, решать вам. Как подключить все эти устройства и проверить их минимальную работу, можно посмотреть по ссылкам ниже.
Si7021: raspberry + датчик температуры и влажности
YL-83: raspberry + датчик контроля протечек
Dfrobot ir motion sensor: raspberry pi + датчик движения
Реле 220v: raspberry pi подключение реле
Создание бота Telegram
Естественное условие — наличие телеграма на вашем девайсе (телефон, пк, планшет — без разницы).
Открываем Telegram, идем в контакты и ищем в поисковой строке @botfather. Кликаем на него. Это мастер или менеджер создания и управления всех ваших ботов.
Вводим в строку ввода «/» и нажимаем «/newbot». Далее все по инструкции.
В конечном итоге, после того как вы придумаете имя контакта и уникальное имя и успешно все создадите, у вас появится TOKEN. Никому не говорите его и не пересылайте, вы будете использовать его в качестве ключа авторизации в ваших программах. При желании можете посмотреть остальные доступные команды, все они начинаются со «/». Например, вы можете создать аватарку вашему боту.
И как мне Telegram-бота написать?
Терпение, мой мальчик, и их_теозавры станут нашими! Для начала хотел бы поделиться очень удобным и простым приложением-помощником для работы с API Telegram — Telepot
Также для реализации всех возможностей потребуется Python3.5. Как установить Python3.5 вы можете прочитать в отдельной статье.
После того как Python3.5 установлен, ставим Telepot из pip:
1 |
$ pip3.5 install telepot |
После установки приложения убедитесь, что >>> import telepot проходит успешно в python3.5
Больше кода богу кода!
В сети видел достаточно примеров ботов, но, если честно, бОльшая часть не красива с точки зрения пользователя. Очень часто людям приходится писать в телефоне команды, а не кликать. Даже если вы видели примеры с кликами, чаще всего они со слешами в начале, которые, мягко говоря, раздражают людей. В данном примере мы попытаемся это исправить. Будем делать вот такие красивые клавиатуры и кнопки со своим содержимым и логикой работы, как на картинке ниже:
В боте будут присутствовать блок управления реле, блок с отображением текущей информации используемых датчиков (влажность/температура/вода/обратная связь от реле) и блок настроек сигнализаций. Последний блок — это такой же блок управления, только не датчиками, а программами или их параметрами. Так же хочу отметить, что будет настроена фильтрация по разрешенным персональным id телеграмма. Данная навеска позволит иметь доступ к боту только вам и вашим близким. Если кратко, то это самые необходимые примеры работы бота в различных проектах. Произвольные клавиатуры могут содержать текст, вызов контакта, локации и еще много чего. «Волшебные серые кнопки» имеют не меньший функционал. Если вы хотите более подробно разобраться с возможностями элементов — обратитесь к официальной документации api.
Поехали. Создаем тело бота:
1 2 |
$ sudo -i # vim /home/pi/bot.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
#!/usr/bin/env python3.5 # -*- coding: utf-8 -*- #project: home-smart-home.ru import subprocess from subprocess import Popen, PIPE import sys,os import asyncio import telepot import telepot.aio from telepot.namedtuple import ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardRemove, ForceReply from telepot.namedtuple import InlineKeyboardMarkup, InlineKeyboardButton from telepot.namedtuple import InlineQueryResultArticle, InlineQueryResultPhoto, InputTextMessageContent #Тут должны находиться ваши айдишники (для примера я сделал 2 разрешенных) #Вы можете запустить бота и увидеть при нажатии меню или /start ваш личный айдишник #Ваши разрешенные айди нужно прописать в переменных chat_allow, заменив None на айдишники #chat_allow1=123456789 #chat_allow2=987654321 chat_allow1=None chat_allow2=None # Ниже пути расположения скриптов чтения значений датчиков и управление реле. # Каждый файл - исполняемый питоновский скрипт. # Необходимо чтобы все файлы были представлены в системе и были исполняемыми. file_read_temp = '/home/pi/si7021_temp.py' file_read_hum = '/home/pi/si7021_hum.py' file_read_water = '/home/pi/water_read.py' file_read_relay = '/home/pi/relay_state.py' file_relay_on = '/home/pi/relay_on.py' file_relay_off = '/home/pi/relay_off.py' # Блок переменных ниже - блок файлов-индификаторов (просто айдишник), отвечающиx за состояние сигнализации. # Для них в директории /home/pi создается отдельная директория (/home/pi/alert_state) # Если файл сигнализации присутствует - значит сигнализация должна слать алерты при срабатывании. # Если файл отсутствует значит считаем что сигнализация выключена. Необходимо для перезагрузок (обесточевания) # И других программ которые хотят знать что с сигнализацией water_id = "/home/pi/alert_state/w_on" motion_id = "/home/pi/alert_state/m_on" temper_id = "/home/pi/alert_state/t_on" # Файл со значением минимального температурного порога срабатывания температурной сигналки # файл должен существовать и необходимо внести в него значение температуры (целое число) critical_temp = "/home/pi/alert_state/critical_temp" # считывание температуры из скрипта для si7021 def temp_read(): proc = Popen(['%s' %file_read_temp], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() t = proc.stdout.read() t = float(t) return t # считывание влажности из скрипта для si7021 def hum_read(): proc = Popen(['%s' %file_read_hum], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() H = proc.stdout.read() H = float(H) return H # считывание значения с датчика воды def water_read(): proc = Popen(['%s' %file_read_water], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() w = proc.communicate()[0] w = w.decode(encoding='utf-8') return w # считывание состояния пина на котором висит реле def relay_read(): proc = Popen(['%s' %file_read_relay], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() r = proc.communicate()[0] r = int(r) if r == 1: r='Реле включено' elif r==0: r='Реле обесточено' else: r='Ошибка!' return r # включение/выключение реле в зависимости от входящего параметра def relay_execute(state): if state == 'on' and relay_read() == 'Реле обесточено': subprocess.call("%s" %file_relay_on, shell=True) text = "включаю реле" elif state == 'on' and relay_read() == 'Реле включено': text = "реле уже под напряжением" elif state == 'off' and relay_read() == 'Реле включено': subprocess.call("%s" %file_relay_off, shell=True) text = "отключаю реле" elif state == 'off' and relay_read() == 'Реле обесточено': text = "реле уже обесточено" else: print("Ошибка!") return text # управление сигнализациями alarm: on/off. file_id - айдишник сигнализации (см выше) def alert_f(alarm, file_id): #сигнализация уже включена if alarm == 'on' and os.path.exists(file_id): text = "Сигнализация уже была включена" #была включена, теперь отключаем elif alarm == 'off' and os.path.exists(file_id): text = "Отключаю сигнализацию" subprocess.call("rm -f %s" %file_id, shell=True) #уже была выключена, выключать не надо elif alarm == 'off' and os.path.exists(file_id) == False: text = "Сигнализация уже была отключена" #выла выключена, теперь включаем elif alarm == 'on' and os.path.exists(file_id) == False: text = "Активирую сигнализацию" subprocess.call("touch %s" %file_id, shell=True) else: text = "err" return text # текущее сотояние сигнализации. file_id - айдишник сигнализации (см выше) def alert_info_f(file_id): if os.path.exists(file_id): text = "Сигнализация сейчас активна" else: text = "Сигнализация сейчас отключена" return text # Текущее минимальное значение температуры # считывание значения с датчика воды def c_t_read(): proc = Popen(['cat %s' %critical_temp], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() c_t = proc.communicate()[0] c_t = c_t.decode(encoding='utf-8') c_t = "\nПорог срабатывания установлен на "+c_t+" градусов" return c_t ################################# #Блоки дальше - тело самого бота# ################################# message_with_inline_keyboard = None id_write_critical_temper = 0 #эта функция отвечает за текстовые сообщения и "клавиатуру" async def on_chat_message(msg): global id_write_critical_temper content_type, chat_type, chat_id = telepot.glance(msg) print('Chat:', content_type, chat_type) print("id отправителя сообщения: "+str(chat_id)) if chat_id == chat_allow1 or chat_id == chat_allow2: if content_type != 'text': return else: ok=1 command = msg['text'].lower() print(command) if command == '/start': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='инфо')], [dict(text='управление')], [dict(text='сигнализация')], ]) await bot.sendMessage(chat_id, 'чем воспользуешься?', reply_markup=markup) elif command == 'главное меню': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='инфо')], [dict(text='управление')], [dict(text='сигнализация')], ]) await bot.sendMessage(chat_id, 'выбери раздел', reply_markup=markup) elif command == u'инфо': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='вода'), dict(text='розетка')], [dict(text='температура'), dict(text='влажность')], [dict(text='главное меню')], ]) await bot.sendMessage(chat_id, 'выбери объект', reply_markup=markup) elif command == u'управление': markup = InlineKeyboardMarkup(inline_keyboard=[ [dict(text='включить', callback_data='relay_on'), dict(text='отключить', callback_data='relay_off')], [dict(text='текущее состояние', callback_data='relay_info')], ]) global message_with_inline_keyboard message_with_inline_keyboard = await bot.sendMessage(chat_id, 'Что сделать с розеткой?', reply_markup=markup) elif command == u'сигнализация': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='контроль воды')], [dict(text='контроль движения')], [dict(text='контроль температуры')], [dict(text='главное меню')], ]) await bot.sendMessage(chat_id, 'какой раздел необходим?', reply_markup=markup) elif command == u'температура': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='вода'), dict(text='розетка')], [dict(text='температура'), dict(text='влажность')], [dict(text='главное меню')] ]) #считываем значение с датчика температуры t = str(temp_read())+'C°' await bot.sendMessage(chat_id, 'Текущая температура: %s' %t, reply_markup=markup) elif command == u'влажность': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='вода'), dict(text='розетка')], [dict(text='температура'), dict(text='влажность')], [dict(text='главное меню')] ]) #считываем значение с датчика влажности h = str(hum_read())+'%' await bot.sendMessage(chat_id, 'Текущая влажность: %s' %h, reply_markup=markup) elif command == u'вода': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='вода'), dict(text='розетка')], [dict(text='температура'), dict(text='влажность')], [dict(text='главное меню')] ]) #считываем значение с датчика воды w = str(water_read()) await bot.sendMessage(chat_id, 'Текущее состояние сенсора воды: %s' %w, reply_markup=markup) elif command == u'розетка': markup = ReplyKeyboardMarkup(keyboard=[ [dict(text='вода'), dict(text='розетка')], [dict(text='температура'), dict(text='влажность')], [dict(text='главное меню')] ]) #считываем значение с пина, на который подключено реле R=str(relay_read()) await bot.sendMessage(chat_id, 'Состояние розетки (реле): %s' %R, reply_markup=markup) elif command == u'контроль воды': markup = InlineKeyboardMarkup(inline_keyboard=[ [dict(text='включить', callback_data='water_on'), dict(text='отключить', callback_data='water_off')], [dict(text='текущее состояние', callback_data='water_alert_info')], ]) message_with_inline_keyboard = await bot.sendMessage(chat_id, 'Опции сигнализации воды:', reply_markup=markup) elif command == u'контроль движения': markup = InlineKeyboardMarkup(inline_keyboard=[ [dict(text='включить', callback_data='motion_on'), dict(text='отключить', callback_data='motion_off')], [dict(text='текущее состояние', callback_data='motion_alert_info')], ]) message_with_inline_keyboard = await bot.sendMessage(chat_id, 'Опции сигнализации движения:', reply_markup=markup) elif command == u'контроль температуры': markup = InlineKeyboardMarkup(inline_keyboard=[ [dict(text='включить', callback_data='temp_on'), dict(text='отключить', callback_data='temp_off')], [dict(text='порог срабатывания', callback_data='temp_alert_min')], [dict(text='текущее состояние', callback_data='temp_alert_info')], ]) message_with_inline_keyboard = await bot.sendMessage(chat_id, 'Опции сигнализации температуры:', reply_markup=markup) else: if id_write_critical_temper == 1: #если происходит установка температуры срабатывания if command.isdigit(): subprocess.call("echo %s > %s" %(command, critical_temp), shell=True) markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text='главное меню')]]) await bot.sendMessage(chat_id, str("Температурный минимум установлен в %s градусов. Ниже этой температуры будут приходить алерты") %command, reply_markup=markup) id_write_critical_temper = 0 else: markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text='главное меню')]]) await bot.sendMessage(chat_id, str("%s - это не целое число. При необходимости пройдите настройку заново. Значение не установлено!") %command, reply_markup=markup) id_write_critical_temper = 0 else: #если ввели текст, не соответствующий команде await bot.sendMessage(chat_id, str("начните чат с команды /start")) else: #если чат айди не соответствует разрешенному markup_protect = ReplyKeyboardMarkup(keyboard=[[dict(text='я очень тугой, еще раз можно?')]]) await bot.sendMessage(chat_id, 'Вы не имеете доступа к этому боту! Обратитесь к владельцу за разрешением.', reply_markup=markup_protect) return #эта функция отвечает за "волшебные полупрозрачные кнопки" async def on_callback_query(msg): global id_write_critical_temper query_id, from_id, data = telepot.glance(msg, flavor='callback_query') print('Callback query:', query_id, data) id_owner_callback=msg['from']['id'] print("id отправителя запроса: "+str(id_owner_callback)) if id_owner_callback == chat_allow1 or id_owner_callback == chat_allow2: #управление реле (розеткой) if data == 'relay_on': R_inf = str(relay_execute('on')) await bot.answerCallbackQuery(query_id, text='%s' %R_inf, show_alert=True) elif data == 'relay_off': R_inf = str(relay_execute('off')) await bot.answerCallbackQuery(query_id, text='%s' %R_inf, show_alert=True) elif data == 'relay_info': R=str(relay_read()) await bot.answerCallbackQuery(query_id, text='%s'%R, show_alert=True) #управление сигнализацией воды elif data == 'water_on': inf = str(alert_f('on', water_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'water_off': inf = str(alert_f('off', water_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'water_alert_info': inf = str(alert_info_f(water_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) #управление сигнализацией движения elif data == 'motion_on': inf = str(alert_f('on', motion_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'motion_off': inf = str(alert_f('off', motion_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'motion_alert_info': inf = str(alert_info_f(motion_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) #управление сигнализацией температуры elif data == 'temp_on': inf = str(alert_f('on', temper_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'temp_off': inf = str(alert_f('off', temper_id)) await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'temp_alert_info': inf = str(alert_info_f(temper_id)) info_c_t = str(c_t_read()) inf = inf+info_c_t await bot.answerCallbackQuery(query_id, text='%s' %inf, show_alert=True) elif data == 'temp_alert_min': id_write_critical_temper = 1 await bot.answerCallbackQuery(query_id, text='Установите min порог срабатывания температурной сигнализации. Введите целое число.', show_alert=True) else: next=1 else: await bot.answerCallbackQuery(query_id, text='У вас нет доступа', show_alert=True) #В TOKEN должен находиться ваш токен, полученый при создании бота! #замените значение на свои данные! TOKEN = "12345678:xxxxx-xyxyxyxyxyxyxyxyxyxyxyxyxyxyx" bot = telepot.aio.Bot(TOKEN) loop = asyncio.get_event_loop() #вызов списка ваших функций для работы с api loop.create_task(bot.message_loop({'chat': on_chat_message, 'callback_query': on_callback_query})) #project: home-smart-home.ru print('Listening ...') loop.run_forever() |
Делаем файл исполняемым:
1 |
# chmod 700 /home/pi/bot.py |
Вам необходимо в теле бота заменить разрешенные айдишники со значением None на свои, а также записать в TOKEN свои данные, полученные при регистрации бота. Как узнать свой id в Telegram?. Айдишник можно узнать, просто запустив бота и нажав на старт. Вы увидите диагностическую информацию с вашим айдишником на экране консоли.
1 2 3 4 5 6 7 8 |
root@raspberrypi:~# /home/pi/bot.py Listening ... Chat: text private id отправителя сообщения: 1XXXXXXXX - это ваш айдишник главное меню Chat: text private id отправителя сообщения: 1XXXXXXXX - это ваш айдишник сигнализация |
Но перед тем как запускать бота, убедитесь, что все необходимые директории и файлы из bot.py представлены в системе! (либо перепишите сценарий под себя).
Для того чтобы вы лучше понимали, как и что должно будет работать и зачем все файлы и команды ниже, нарисую маленькую схему. Схема не претендует на звание лучшей, она лишь отображает все в упрощенном виде:
Теперь опишем все более детально.
Директория для состояния сигнализаций и файл с критической температурой:
1 2 |
# mkdir /home/pi/alert_state # echo 10 > /home/pi/alert_state/critical_temp |
Сценарий включения реле (signal на 20 pin BCM):
1 |
# vim /home/pi/relay_on.py |
1 2 3 4 5 6 7 8 9 10 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import RPi.GPIO as GPIO gpio_pin_number=20 #на 20 пине (BCM) сигнальный контакт реле GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) GPIO.setup(gpio_pin_number, GPIO.OUT) GPIO.output(gpio_pin_number, GPIO.HIGH) |
1 |
# chmod +x /home/pi/relay_on.py |
Сценарий отключения реле (signal на 20 pin BCM):
1 |
# vim /home/pi/relay_off.py |
1 2 3 4 5 6 7 8 9 10 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import RPi.GPIO as GPIO gpio_pin_number=20 #на 20 пине (BCM) сигнальный контакт реле GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) GPIO.setup(gpio_pin_number, GPIO.OUT) GPIO.output(gpio_pin_number, GPIO.LOW) |
1 |
# chmod +x /home/pi/relay_off.py |
Текущее состояние реле смотрим через пин, на котором подключен сигнальный провод реле. (Конечно, вам никто не мешает сделать более реальный контроль через опторазвязку):
1 |
# vim /home/pi/relay_state.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import subprocess from subprocess import Popen, PIPE pin_number=20 #пин с сигнальным проводом реле proc = Popen( "echo %s > /sys/class/gpio/export" % pin_number, shell=True, stdout=PIPE, stderr=PIPE ) proc.wait() proc = Popen( "cat /sys/class/gpio/gpio%s/value" % pin_number, shell=True, stdout=PIPE, stderr=PIPE ) proc.wait() res = proc.communicate() count = res[0].replace("\n","") count = int(count) if count == 0: print('0').replace("\n","") else: print('1').replace("\n","") |
1 |
# chmod +x /home/pi/relay_state.py |
Считывание температуры из Si7021 (подключен по i2c):
1 |
# vim /home/pi/si7021_temp.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import smbus import time bus = smbus.SMBus(1) bus.write_byte(0x40, 0xF5) time.sleep(0.3) data0 = bus.read_byte(0x40) data1 = bus.read_byte(0x40) humidity = ((data0 * 256 + data1) * 125 / 65536.0) - 6 time.sleep(0.3) bus.write_byte(0x40, 0xF3) time.sleep(0.3) data0 = bus.read_byte(0x40) data1 = bus.read_byte(0x40) cTemp = ((data0 * 256 + data1) * 175.72 / 65536.0) - 46.85 print "%.2f" %cTemp |
1 |
# chmod +x /home/pi/si7021_temp.py |
Считывание влажности из Si7021 (подключен по i2c):
1 |
# vim /home/pi/si7021_hum.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import smbus import time bus = smbus.SMBus(1) bus.write_byte(0x40, 0xF5) time.sleep(0.3) data0 = bus.read_byte(0x40) data1 = bus.read_byte(0x40) humidity = ((data0 * 256 + data1) * 125 / 65536.0) - 6 time.sleep(0.3) bus.write_byte(0x40, 0xF3) time.sleep(0.3) data0 = bus.read_byte(0x40) data1 = bus.read_byte(0x40) cTemp = ((data0 * 256 + data1) * 175.72 / 65536.0) - 46.85 print "%.2f" %humidity |
1 |
# chmod +x /home/pi/si7021_hum.py |
Считывание состояния из датчика наличия воды (капель). Подключен на 17pin BCM:
1 |
# vim /home/pi/water_read.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import RPi.GPIO as io pin=17 #сигнальный провод от датчика воды идет на 17 pin io.setmode(io.BCM) io.setup(pin, io.IN) #устанавливаем пин на вход signal = io.input(pin) #считываем сигнал if signal == 1: print "датчик сухой" elif signal == 0: print "на датчике обнаружена вода!" else: print "ошибка!" |
1 |
# chmod +x /home/pi/water_read.py |
Итак, скрипты по управлению реле и считывания данных с датчиков написаны и протестированы, можно потестировать бота. Запускаем. Кликаем. Переписываем chat_allow на разрешенные, например, вписываем в первый свой, во второй — id телеграма жены, нет жены, тогда брата, нет брата, тогда кота 🙂
Протестировали? Красиво? Но это же не все. Во-первых, где программы сигнализаций движения/воды/температуры? Во-вторых, а как мне отправлять сообщения из системы в моего бота? Не тогда, когда я нажимаю на кнопку и получаю callback, а если что-то случится, чтобы бот сам прислал?
Спокойствие, только спокойствие! Сейчас все сделаем.
Оправка сообщений из linux в Telegram
Щас прогреем и поедем (с) Сейчас все напишем.
Для реализации такого сценария нам понадобится все тот же telepot и знание вашего персонального id и токена от бота. Ниже напишу пример скрипта, который будет отправлять входящую переменную $1 через вашего бота на ваш id. Этот сценарий можно будет вызывать любой вашей программой или приложением из системы. Согласитесь, весьма удобно. И где эти вайберы / вотсапы / скайпы ? Правильно, api — отличная вещь.
Ну, за дело:
1 |
# vim /home/pi/telegram_sender.py |
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python3.5 # -*- coding: utf-8 -*- #project: home-smart-home.ru import sys import telepot text = sys.argv[1] chat_id = 123456789 #замените на свой id TOKEN = "ВСТАВЬТЕ ВАШ ТОКЕН" bot = telepot.Bot(TOKEN) bot.sendMessage(chat_id, str(text)) |
Делаем скрипт исполняемым и работающим только под root:
1 |
# chmod 700 /home/pi/telegram_sender.py |
Проверяем, как это работает. Запускаем сценарий и на вход подаем произвольную строку в кавычках:
1 |
# /home/pi/telegram_sender.py 'Привет, хозяин! Готов отсылать все, что ты скажешь :)' |
Радуемся! Дело остается за малым: нужно написать небольшие сигнализации, которые бы отсылали в бота информацию о происшествиях, если таковые случились. Нам потребуется написать циклы, которые будут крутиться всегда и периодически проверять, как дела на наших датчиках.
Пишем свои маленькие «сигнализации»
Общий принцип следующий: вращаем бесконечный цикл, отвечающий за датчик, который будет периодически снимать показания, и если программа обнаружит, что показания проблемные, будет отправлять нам сообщения, скажем, раз в 10 минут (на самом деле лучше так не делать, ибо очень назойливо получать аварийные сообщения, а написать так, чтобы отсылка происходила один раз и потом только тогда, когда состояние снова станет нормальным. Но статья не об этом. Поэтому пример максимально упрощен.
1 |
# vim /home/pi/multialert.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import time,os import subprocess from subprocess import Popen, PIPE import threading from threading import Thread t_s = 3 #штатный интервал ожидания t_s2 = 30 #аварийный интервал ожидания воды t_s3 = 30 #повторный интервал ожидания движения t_s4 = 30 #интервал ожидания после того как температура упала ниже критической # "Айдишники" сигнализаций # Сигнализация будет включена или выключена в зависимости от наличия айдишника water_id = '/home/pi/alert_state/w_on' motion_id = '/home/pi/alert_state/m_on' temper_id = '/home/pi/alert_state/t_on' # Далее каждую сигнализацию вращаем в своем классе через трединг. # Сделано зе тем, чтобы каждый цикл имел свой отдельный процесс (тред) # со своим таймаутом после срабатывания сигнализации скоростью работы и т.д. #Класс ВОДЫ class water_check(threading.Thread): def run ( self ): while True: #ели сигнализация сейчас включена (через телеграм) if os.path.exists(water_id): proc = Popen(['''/home/pi/water_read.py'''], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() w = proc.communicate()[0] w = w.decode(encoding='utf-8') w = w.replace('\n','') if w == u'на датчике обнаружена вода!': print w text = "Хозяин! Мы тонем! На датчике вода! Проверь скоре" subprocess.call(['''/home/pi/telegram_sender.py "%s"''' %text], shell=True) time.sleep(t_s2) elif w == u'датчик сухой': time.sleep(t_s) else: print("ERR") time.sleep(t_s) else: #если сигналка воды сейчас неактивна print('сигналка воды выключена') time.sleep(t_s) water_check().start() #Класс ДВИЖЕНИЯ class motion_check(threading.Thread): def run ( self ): while True: #ели сигнализация движения сейчас включена (через телеграм) if os.path.exists(motion_id): proc = Popen(['''/home/pi/motion_read.py'''], shell=True, stdout=PIPE, stderr=PIPE) proc.wait() m = proc.communicate()[0] m = m.decode(encoding='utf-8') m = m.replace('\n','') if m == u'обнаружено движение': print m text = "Хозяин! Кто-то тут ходит! Это ты? Мне страшно.." subprocess.call(['''/home/pi/telegram_sender.py "%s"''' %text], shell=True) time.sleep(t_s3) else: print("ERR") time.sleep(t_s) else: #если сигналка движения сейчас неактивна print('сигналка движения выключена') time.sleep(t_s) motion_check().start() #обьявим необходимые ф-ции для температуры: #считывание минимального градуса # файл должен быть создан и в нем должно быть целое значение def criticar_read(): inf = Popen('''cat /home/pi/alert_state/critical_temp''', shell=True, stdout = PIPE) inf.wait() out = inf.communicate() t = out[0].replace("\n","") t = int(t) return t # получаем актуальную температуру # cтрого говоря так делать не надо :) # Потому что вы обращаетесь к одному девайсу из разных # мест и шина i2c может быть занят другим запросом. # в дальнейшем стоит позаботиться о текущей температуре и # запрашивать ее только из единого места. Сейчас это может делать бот телеграми и этот скрипт # поэтому есть вероятность того что при одновременном запросе вы получите отсутствие данных # тем не менее можно сделать минимальную фильтрацию на входе и это частично поможет. #project: home-smart-home.ru def temper_inf(): txt = "/home/pi/si7021_temp.py" exe = Popen("%s" % txt, shell=True, stdout = PIPE) exe.wait() inf = exe.communicate() inf = inf[0].replace("\n","") if inf == '': print('нету результата текущей температуры') none = 1000 return none elif inf == 'None': none = 1000 print('нету результата текущей температуры') return none else: t = round(float(inf)) t = int(t) return t #Класс ТЕМПЕРАТУРЫ class temper_check(threading.Thread): def run ( self ): while True: #если сигнализация включена if os.path.exists(temper_id): #считываем занчения T и лимита срабатывания t_now = temper_inf() t_critical = criticar_read() #если датчик не ответил (лучше всегда ставить проверки такого рода) if t_now == 1000: time.sleep(t_s) #если считанная температура ниже минимальной elif t_now < t_critical: info = "Текущая температура: %s C" %t_now print(info) text = "Хозяин! Температура ниже критической! Я замерзаю... Текущая температура: %s C" %t_now subprocess.call(['''/home/pi/telegram_sender.py "%s"''' %text], shell=True) time.sleep(t_s4) else: #если считанная температура в норме time.sleep(t_s) else: #если сигналка выключена time.sleep(t_s) temper_check().start() |
Не забываем сделать сценарий исполняемым:
1 |
# chmod +x /home/pi/multialert.py |
Последний скрипт, который мы не указали и который должен присутствовать, — это скрипт по контролю объемника.
Мы воспользуемся прерываниями библиотеки gpio и будем ждать, когда произойдет движение, и только тогда скажем об этом нашему треду из multialert.py:
1 |
# vim /home/pi/motion_read.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/env python # -*- coding: utf-8 -*- #project: home-smart-home.ru import subprocess import RPi.GPIO as GPIO def RCtime (RCpin): GPIO.setmode(GPIO.BCM) GPIO.setup(RCpin, GPIO.IN) # GPIO.wait_for_edge(RCpin,GPIO.FALLING) #можете поиграть с этими строками, оставив 1 нужную GPIO.wait_for_edge(RCpin,GPIO.RISING) #можете поиграть с этими строками, оставив 1 нужную # GPIO.wait_for_edge(RCpin,GPIO.BOTH) #можете поиграть с этими строками, оставив 1 нужную signal = GPIO.input(RCpin) print "обнаружено движение" RCtime(16) |
Добавляем бит исполнения:
1 |
# chmod +x /home/pi/motion_read.py |
Обьемник подключен на 16 пин. При запуске этого сценария вы получите ответ, что движение есть только тогда, когда ваш обьемник действительно сработает. После этого программа завершится.
Добавление бота Telegram и остальных сценариев в автозагрузку
Знающие люди, конечно, могут написать своих демонов (шутка). Но если вы хотите, чтобы бот и сигнализации всего лишь постоянно работали после загрузки системы, можно добавить все в /etc/rc.local:
1 |
# vim /etc/rc.local |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
#!/bin/sh -e # # rc.local # # This script is executed at the end of each multiuser runlevel. # Make sure that the script will "exit 0" on success or any other # value on error. # # In order to enable or disable this script just change the execution # bits. # # By default this script does nothing. # Print the IP address _IP=$(hostname -I) || true if [ "$_IP" ]; then printf "My IP address is %s\n" "$_IP" fi #бот телеграма #home-smart-home.ru /home/pi/bot.py & #сигнализации #home-smart-home.ru /home/pi/multialert.py & exit 0 |
Итог
В статье предоставлен весьма подробный туториал, как сделать своего бота, работающего с различными устройствами. Причем выглядит это весьма неплохо, все кликабельно и претендует на жизнь.
Дополнительно описаны процессы создания сигнализаций и простых отсылок сообщений из системы.
Все в совокупности может тянуть на небольшой проект, НО не стоит радоваться раньше времени! Перед тем как пустить это в дело в автономное плавание, программно стоит позаботиться о корректности всех входящих данных, протестировать: а что будет, если у вас нет интернета, а если датчик перегорит , если провод сожрет кот, если… если, если.
Можно считать этот материал лабораторным, но весьма близким к реальному.
Всем упорства в работе, отсутствия коротких замыканий и да прибудет с вами сила 🙂
Подписывайтесь на блог, обещаю потихоньку выкладывать такие же функциональные и полезные статьи.
Подскажите пожалуйста при помощи чего вы воспроизводите речь?
Спасибо, очень хорошая статья.
Подскажите пожалуйста, как управлять через бот не одним реле, а тремя.
Большое спасибо, за ответ.
А как отправить например картинку лежащую в той же директории что запускаемый файл?
Добрый день,
подскажите пож-ста,
как настроить на rasberry Pi обход блокировки Telegram для бота.
А то уже неделю, как управление перестало работать.
У меня настроено управление отоплением и контроль температуры по всему дому и на улице,
уже привык к классному сервису, а тут Роском надзор весь кайф обломал
Тоже долго искал, вставить где нибудь в начале:
telepot.aio.api.set_proxy(«http://server:port»)
Александр, спасибо за ваш ответ. Я новичок, подскажите пожалуйста, не могу разобраться — какой сервер и порт указывать? Можете рабочий пример написать?
локально поднять тор
Ааа, допер, спасибо! Я неделю с этим мучался, с обходом блокировки, а оказывается всё так просто =)
можно актуализировать статью на 2019год
Всем привет.
Статья супер. Я по ней себе сделал управление умным домом.
Очень круто. Автору спасибо. Всем рекомендую повторить 🙂
Удалось подключить реле?
Да, у меня управление реле. А вообще там можно все что хочешь сделать.
Не хрена не получается. Я уже четвёртый день с этим реле воюю, не могу ума дать оно включается с телеграмм и то кнопкой отключить, и не выключается. Кто бы статью поправил, или сделал бы ссылку на новую если можно. Ждём ссылок на статьи!
Статья очень сложная для новичков, было бы удобнее от простого к сложному. Например, не имея реле и прочих датчиков не смог написать включение/выключение диода на макетной плате.
Диод слишком просто. Охото зрелищ. Вот все и стремятся к лучшему. У вас получилось настроить бота?