knd_klinteplo/des_assigned.py
2022-01-19 18:38:00 +03:00

265 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import random
import string
# import json
import datetime
import time
import os
from classes.users import Users
import classes.desinfection as des
import classes.proverka as prov
def get_questions(id_task, session, headers):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
questions = session.get(url, headers=headers)
json_quest = questions.json()
return json_quest['selected_chains']
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
# print(files)
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img
# def complete_task(session, headers, task, address):
# photos = get_photo(address, headers['uid'])
# if len(photos) > 0:
# photos.sort()
# print(task['id'])
# id_task = task['id']
# questions = get_questions(id_task, session, headers)
# data = []
# for answers in questions:
# answer_chain = {
# 'id': answers['id'],
# 'questions': [] # answers['questions']
# }
# if answer_chain['id'] == 6146: # Цепочка без ответов
# continue
# for components in answers['questions']:
# comp = []
# q_c = {
# 'id': components['id'],
# 'question_components': []
# }
# if q_c['id'] == 6560: # Вопрос без ответа
# continue
# comp.append(q_c)
# answer_chain.update({'questions': comp})
# data.append(answer_chain)
# if len(data) == len(photos):
# for chain in data:
# for item in chain['questions']:
# if len(data) == len(photos):
# photo_from_photos = photos.pop(0)
# img = send_image(session, headers, task['id'], photo_from_photos) # photos.pop(0))
# # count_send_photo = 0
# # while img['id'] is None:
# # count_send_photo += 1
# # print("Попытка № " + str(count_send_photo) + "отправить фото" + str(photo_from_photos))
# # img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# first = {"id": 18850, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# first.update({'end_time': int(time.time())})
# second = {"id": 18851, "answer": {"location": task['coord']}, 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# second.update({'end_time': int(time.time())})
# item['question_components'].append(first)
# item['question_components'].append(second)
# else:
# # img = send_image(session, headers, task['id'], photos.pop(0))
# photo_from_photos = photos.pop(0)
# img = send_image(session, headers, task['id'], photo_from_photos) # photos.pop(0))
# # count_send_photo = 0
# # while img['id'] is None:
# # count_send_photo += 1
# # print("Попытка № " + str(count_send_photo) + " отправить фото" + str(photo_from_photos))
# # img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# first = {"id": 18856, "answer": 77, 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# first.update({'end_time': int(time.time())})
# second = {"id": 18857, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# second.update({'end_time': int(time.time())})
# item['question_components'].append(first)
# item['question_components'].append(second)
# ans = {"question_chains": data}
# # print(ans)
# session_t = session
# session_t.headers.update(
# {
# 'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
# 'x-platform': 'wb',
# 'accept': '*/*'
# }
# )
# complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
# if complete.status_code == 200:
# print(f'Задание по адресу {address} выполнено успешно!')
# else:
# print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}')
# else:
# print(f"Несоответствие количества вопросов и количества фотографий по адресу: {address}")
# # send_execution(task,session, headers)
def get_photo(address, email):
photos = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def get_addresses(email):
addresses = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data')
path = os.walk(os.path.join(main_path, email, str(month)))
if not os.path.exists(os.path.join(main_path, email, str(month))):
return f"Не найдена папка с месяцем у пользователя {email}! Проверьте структуру каталога!"
for d, dirs, files in path:
addresses.append(dirs)
if len(addresses[0]) > 0:
return addresses[0]
else:
return f"Ни одного адреса для пользователя {email} не найдено! Добавьте фото и запустите программу снова!"
# def assign_task(session, headers, task, address):
# url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
# task_assign = session.put(url, headers=headers)
# if task_assign.status_code == 200:
# print(f'Получили задани по адресу: {address}')
# complete_task(session, headers, task, address)
# else:
# print(f"Не смогли получить задание {address} {task['id']}!")
def get_task(session, headers):
"""Получаем все доступные задания. Если в переменную search добавить адрес,
то будет искать этот адрес. Он попадет в
json_tasks['executions_available'],
так что надо понять откуда берем задания. Отсюда можно возвращать
все параметры задания:
1 координаты
2 адрес
3 подъезд
4 id_задания"""
desinfection = 'Дезинфекция подъездов (МОП)'
proverka = 'Систематическая проверка подъездов МКД'
# Получаем адреса
addresses = get_addresses(headers['uid'])
# Если адреса получили
if isinstance(addresses, list):
session_t = session
session_t.headers.update(
{'referer': 'https://knd.mosreg.ru/executions',
'x-platform': 'android',
'accept': '*/*'
}
)
for address in addresses:
# change '-' for '/'
# need split enterance from address
addr = address.replace("Чаиковского", "Чайковского")
addr = address.replace("Маиданово", "Майданово")
search = addr.replace("_", "/")[:-4]
# print(search)
data = {'search': search}
tasks = session_t.post('https://knd.mosreg.ru//api/v1/actor/executions',
headers=headers,
data=data
)
json_tasks = tasks.json()
# get_t = False
# Если есть доступное задание с таким адресом
exec_assigned = json_tasks['executions_assigned'] # Уже полученные задания
exec_available = json_tasks['executions_available'] # Еще не принятые задания
# print(address)
if len(exec_assigned) > 0:
for n in exec_assigned:
coord = n['coord']
enterances = n['tasks']
for execution in enterances:
address_from_tasks = execution['dimensions'][0].get('entity_value_name')
name_from_tasks = execution['name']
if name_from_tasks == desinfection:
enterance = execution['dimensions'][2].get('entity_value_name')
if address_from_tasks == 'Клин г, ' + addr.replace("_", "/")[:-4] and enterance[-1] == address[-2]:
print('\n')
print(name_from_tasks)
print('\n')
task = {
'id': execution['execution_id'],
'address': execution['dimensions'][0].get('entity_value_name'),
'enterance': execution['dimensions'][2].get('entity_value_name'),
'coord': coord
}
if name_from_tasks == desinfection:
des.assign_task(session, headers, task, address)
elif name_from_tasks == proverka:
pass
# prov.assign_task(session, headers, task, address)
else:
pass
else:
print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}")
# Если адреса не получили выводим ошибку
else:
print(addresses)
def main():
users = Users()
logins = users.get_passwords()
if isinstance(logins, list):
url = 'https://knd.mosreg.ru//api/v1/auth/sign_in'
for user in logins:
session = requests.Session()
login = {
'email': user['email'],
'password': user['password']
}
response = session.post(url, data=login)
headers = {
'client': response.headers.get('client'),
'Access-token': response.headers.get('Access-token'),
'uid': response.headers.get('uid'),
}
if response.status_code == 200:
get_task(session, headers)
else:
print(f"Отказ в авторизации для пользователя {user['email']}!")
time.sleep(1)
else:
print(users.get_passwords())
if __name__ == "__main__":
main()
k = input("Press ENTER for exit")
def random_string(stringLength):
letters = string.ascii_letters
digits = string.digits
return ''.join(random.choice(letters+digits) for i in range(stringLength))