import requests import random import string # import json import datetime import time import os # from classes.users import Users # from classes.users_led import Users_led # from classes.users_dip import Users_dip from classes.users_kp import Users_kp import classes.kp as kp # import classes.desinfection as des # import classes.proverka as prov # import classes.led as led # import classes.dip as osmotr_dip def get_addresses(email): addresses = [] # month = datetime.datetime.now().month main_path = os.path.join(os.path.abspath('.'), 'data_dip') path = os.walk(os.path.join(main_path, email)) # if not os.path.exists(os.path.join(main_path, email, str(month))): # return f"Не найдена папка с месяцем у пользователя {email}! Проверьте \ # структуру каталога!" # print(path) for d, dirs, files in path: addresses.append(dirs) if len(addresses[0]) > 0: return addresses[0] else: return f"Ни одного адреса для пользователя {email} не найдено! \ Добавьте фото и запустите программу снова!" def get_task(session, headers): """Получаем все доступные задания. Если в переменную search добавить адрес, то будет искать этот адрес. Он попадет в json_tasks['executions_available'], так что надо понять откуда берем задания. Отсюда можно возвращать все параметры задания: 1 координаты 2 адрес 3 подъезд 4 id_задания""" # desinfection = 'Дезинфекция подъездов (МОП)' # proverka = 'Систематическая проверка подъездов МКД' # naled = 'Проверка образования наледи на кровле' # dip = 'Осмотр ДИП/ФОП' # s = 'ГЖИ. Системная проверка МКД ' session_t = session session_t.headers.update( {'referer': 'https://knd.mosreg.ru/executions', 'x-platform': 'android', 'accept': '*/*' } ) tasks = session_t.post('https://knd.mosreg.ru//api/v1/executions', headers=headers ) json_tasks = tasks.json() # print(json_tasks) exec_assigned = json_tasks['executions_assigned'] if len(exec_assigned) > 0: for execution in exec_assigned: coord = execution['coord'] tasks = execution['tasks'] for task in tasks: task_name = task['name'] print(task_name) access_task = { 'id': task['execution_id'], 'address': task['dimensions'][1].get('entity_value_name'), 'coord': coord } kp.assign_task(session, headers, access_task) else: print(f"Нет доступных заданий для пользователя {headers['uid']}!") # Получаем адреса # addresses = get_addresses(headers['uid']) # # Если адреса получили # if isinstance(addresses, list): # session_t = session # session_t.headers.update( # {'referer': 'https://knd.mosreg.ru/executions', # 'x-platform': 'android', # 'accept': '*/*' # } # ) # for address in addresses: # # change '-' for '/' # # need split enterance from address # addr = address.replace("Чаиковского", "Чайковского") # addr = address.replace("Маиданово", "Майданово") # addr = address.replace("Молодежныи", "Молодежный") # addr = addr.replace("Ломоносовскии", "Ломоносовский") # search = addr.replace("_", "/") # # print(search) # data = {'search': search} # tasks = session_t.post('https://knd.mosreg.ru//api/v1/actor/executions', # headers=headers, # data=data # ) # json_tasks = tasks.json() # # print(json_tasks) # # get_t = False # # Если есть доступное задание с таким адресом # # # # Уже полученные задания # print(json_tasks) # exec_assigned = json_tasks['executions_assigned'] # print(exec_assigned) # # Еще не принятые задания # exec_available = json_tasks['executions_available'] # # print(exec_available) # # print(address) # if len(exec_assigned) > 0: # # print(len(exec_assigned)) # for n in exec_assigned: # coord = n['coord'] # enterances = n['tasks'] # for execution in enterances: # address_from_tasks = execution['dimensions'][1].get('entity_value_name') # print(address_from_tasks) # name_from_tasks = execution['name'] # # if name_from_tasks == dip: # if name_from_tasks == s: # # enterance = execution['dimensions'][2].get('entity_value_name') # if address_from_tasks == 'г. Клин, ' + addr.replace("_", "/"): # print('\n') # print(name_from_tasks) # print('\n') # task = { # 'id': execution['execution_id'], # 'address': execution['dimensions'][1].get('entity_value_name'), # 'coord': coord # } # if name_from_tasks == desinfection: # # des.assign_task(session, headers, task, address) # pass # elif name_from_tasks == proverka: # pass # # prov.assign_task(session, headers, task, address) # elif name_from_tasks == naled: # pass # # led.assign_task(session, headers, task, address) # elif name_from_tasks == dip: # # osmotr_dip.assign_task(session, headers, task, address) # pass # elif name_from_tasks == s: # osmotr_dip.assign_task(session, headers, task, address) # else: # pass # else: # print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}") # # Если адреса не получили выводим ошибку # else: # print(addresses) def main(): users = Users_kp() logins = users.get_passwords() if isinstance(logins, list): url = 'https://knd.mosreg.ru//api/v1/auth/sign_in' for user in logins: session = requests.Session() login = { 'email': user['email'], 'password': user['password'] } response = session.post(url, data=login) headers = { 'client': response.headers.get('client'), 'Access-token': response.headers.get('Access-token'), 'uid': response.headers.get('uid'), } if response.status_code == 200: print(f"Пользователь {user['email']} успешно авторизовался!") # print(response.headers) get_task(session, headers) else: print(f"Отказ в авторизации для пользователя {user['email']}!") time.sleep(1) else: print(users.get_passwords()) if __name__ == "__main__": main() k = input("Press ENTER for exit")