knd_klinteplo/naled.py
2024-02-19 13:23:20 +03:00

150 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import random
# import string
# import json
# import datetime
# import time
import os
import requests
# import classes.desinfection as des
# import classes.led as led
# import classes.proverka as prov
# from classes.users import Users
from classes.users_led import Users_led
def get_addresses(email):
addresses = []
# month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data_led')
path = os.walk(os.path.join(main_path, email))
# if not os.path.exists(os.path.join(main_path, email, str(month))):
# return f"Не найдена папка с месяцем у пользователя {email}! Проверьте \
# структуру каталога!"
# print(path)
for d, dirs, files in path:
addresses.append(dirs)
if len(addresses[0]) > 0:
return addresses[0]
else:
return f"Ни одного адреса для пользователя {email} не найдено! \
Добавьте фото и запустите программу снова!"
def get_task(session, headers):
"""Получаем все доступные задания. Если в переменную search добавить адрес,
то будет искать этот адрес. Он попадет в
json_tasks['executions_available'],
так что надо понять откуда берем задания. Отсюда можно возвращать
все параметры задания:
1 координаты
2 адрес
3 подъезд
4 id_задания"""
desinfection = 'Дезинфекция подъездов (МОП)'
proverka = 'Систематическая проверка подъездов МКД'
naled = 'Проверка образования наледи на кровле'
dip = 'Осмотр ДИП/ФОП'
# Получаем адреса
addresses = get_addresses(headers['uid'])
# Если адреса получили
if isinstance(addresses, list):
session_t = session
session_t.headers.update(
{'referer': 'https://knd.mosreg.ru/executions',
'x-platform': 'android',
'accept': '*/*'
}
)
for address in addresses:
# change '-' for '/'
# need split enterance from address
addr = address.replace("Чаиковского", "Чайковского")
addr = address.replace("Маиданово", "Майданово")
search = addr.replace("_", "/")
# print(search)
data = {'search': search}
tasks = session_t.post('https://knd.mosreg.ru//api/v1/actor/executions',
headers=headers,
data=data
)
json_tasks = tasks.json()
# get_t = False
# Если есть доступное задание с таким адресом
#
# Уже полученные задания
exec_assigned = json_tasks['executions_assigned']
# print(exec_assigned)
# Еще не принятые задания
exec_available = json_tasks['executions_available']
# print(exec_available)
# print(address)
if len(exec_assigned) > 0:
for n in exec_assigned:
coord = n['coord']
enterances = n['tasks']
for execution in enterances:
address_from_tasks = execution['dimensions'][0].get('entity_value_name')
# print(address_from_tasks)
name_from_tasks = execution['name']
if name_from_tasks == naled:
enterance = execution['dimensions'][2].get('entity_value_name')
if address_from_tasks == 'Клин г, ' + addr.replace("_", "/"):
print('\n')
print(name_from_tasks)
print('\n')
task = {
'id': execution['execution_id'],
'address': execution['dimensions'][0].get('entity_value_name'),
'enterance': execution['dimensions'][2].get('entity_value_name'),
'coord': coord
}
if name_from_tasks == desinfection:
# des.assign_task(session, headers, task, address)
pass
elif name_from_tasks == proverka:
pass
# prov.assign_task(session, headers, task, address)
elif name_from_tasks == naled:
pass
# led.assign_task(session, headers, task, address)
else:
pass
else:
print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}")
# Если адреса не получили выводим ошибку
else:
print(addresses)
def main():
users = Users_led()
logins = users.get_passwords()
if isinstance(logins, list):
url = 'https://knd.mosreg.ru//api/v1/auth/sign_in'
for user in logins:
session = requests.Session()
login = {
'email': user['email'],
'password': user['password']
}
response = session.post(url, data=login)
headers = {
'client': response.headers.get('client'),
'Access-token': response.headers.get('Access-token'),
'uid': response.headers.get('uid'),
}
if response.status_code == 200:
get_task(session, headers)
else:
print(f"Отказ в авторизации для пользователя {user['email']}!")
time.sleep(1)
else:
print(users.get_passwords())
if __name__ == "__main__":
main()
k = input("Press ENTER for exit")