# import json import datetime import os import random import string import time import requests import classes.desinfection as des # import classes.proverka as prov from classes.users import Users def get_questions(id_task, session, headers): url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions' questions = session.get(url, headers=headers) json_quest = questions.json() return json_quest['selected_chains'] def send_image(session, headers, url, photo): """Отправляем фото на сервер. Сервер должен вернуть id и url.""" files = { 'image': (photo[-5:], open(photo, 'rb')), } # print(files) img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url) + '/images', headers=headers, files=files) if img.status_code == 200: print(f"Фото {photo} успешно отправлено") return img.json() else: print(f"Не удалось загрузить фото на сервер по заданию {url}, \ статус ошибки {img.status_code}! Повторная попытка!") second_img = send_image(session, headers, url, photo) return second_img def get_photo(address, email): photos = [] month = datetime.datetime.now().month main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address) if not os.path.exists(main_path): return photos else: for images in os.listdir(main_path): if (images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG")): photos.append(main_path + '/' + images) return photos def get_addresses(email): addresses = [] month = datetime.datetime.now().month main_path = os.path.join(os.path.abspath('.'), 'data') path = os.walk(os.path.join(main_path, email, str(month))) if not os.path.exists(os.path.join(main_path, email, str(month))): return f"Не найдена папка с месяцем у пользователя {email}! Проверьте \ структуру каталога!" for d, dirs, files in path: addresses.append(dirs) if len(addresses[0]) > 0: return addresses[0] else: return f"Ни одного адреса для пользователя {email} не найдено! \ Добавьте фото и запустите программу снова!" def get_task(session, headers): """Получаем все доступные задания. Если в переменную search добавить адрес, то будет искать этот адрес. Он попадет в json_tasks['executions_available'], так что надо понять откуда берем задания. Отсюда можно возвращать все параметры задания: 1 координаты 2 адрес 3 подъезд 4 id_задания""" desinfection = 'Дезинфекция подъездов (МОП)' proverka = 'Систематическая проверка подъездов МКД' # Получаем адреса addresses = get_addresses(headers['uid']) # Если адреса получили if isinstance(addresses, list): session_t = session session_t.headers.update( {'referer': 'https://knd.mosreg.ru/executions', 'x-platform': 'android', 'accept': '*/*' } ) for address in addresses: # change '-' for '/' # need split enterance from address addr = address.replace("Чаиковского", "Чайковского") addr = address.replace("Маиданово", "Майданово") search = addr.replace("_", "/")[:-4] # print(search) data = {'search': search} tasks = session_t.post('https://knd.mosreg.ru//api/v1/actor/executions', headers=headers, data=data ) json_tasks = tasks.json() # get_t = False # Если есть доступное задание с таким адресом # # Уже полученные задания exec_assigned = json_tasks['executions_assigned'] print(exec_assigned) # Еще не принятые задания exec_available = json_tasks['executions_available'] print(exec_available) # print(address) if len(exec_available) > 0: for n in exec_available: coord = n['coord'] enterances = n['tasks'] for execution in enterances: address_from_tasks = execution['dimensions'][0].get( 'entity_value_name') name_from_tasks = execution['name'] if name_from_tasks == desinfection: enterance = execution['dimensions'][2].get( 'entity_value_name') if address_from_tasks == 'Клин г, ' + addr.replace("_", "/")[:-4] and enterance[-1] == address[-2]: print('\n') print(name_from_tasks) print('\n') task = { 'id': execution['execution_id'], 'address': execution['dimensions'][0].get('entity_value_name'), 'enterance': execution['dimensions'][2].get('entity_value_name'), 'coord': coord } if name_from_tasks == desinfection: des.assign_task( session, headers, task, address) elif name_from_tasks == proverka: pass # prov.assign_task(session, headers, task, address) else: pass else: print( f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}") # Если адреса не получили выводим ошибку else: print(addresses) def main(): users = Users() logins = users.get_passwords() if isinstance(logins, list): url = 'https://knd.mosreg.ru//api/v1/auth/sign_in' for user in logins: session = requests.Session() login = { 'email': user['email'], 'password': user['password'] } response = session.post(url, data=login) headers = { 'client': response.headers.get('client'), 'Access-token': response.headers.get('Access-token'), 'uid': response.headers.get('uid'), } if response.status_code == 200: get_task(session, headers) else: print(f"Отказ в авторизации для пользователя {user['email']}!") time.sleep(1) else: print(users.get_passwords()) if __name__ == "__main__": main() k = input("Press ENTER for exit") def random_string(stringLength): letters = string.ascii_letters digits = string.digits return ''.join(random.choice(letters+digits) for i in range(stringLength))