new knd_bot; разделены отправки фото по подъездам и графиков

This commit is contained in:
Noretsa 2020-09-08 17:51:24 +03:00
parent ac71e07eb6
commit b7cf9a6885
13 changed files with 1294 additions and 0 deletions

0
classes/__init__.py Normal file
View File

244
classes/create_photo.py Normal file
View File

@ -0,0 +1,244 @@
from datetime import datetime, timedelta
import os
import shutil
from docx import Document
from docx.text.paragraph import Paragraph
from docx.shared import Inches, Pt, RGBColor
from docx.enum.style import WD_STYLE_TYPE
import sys
import subprocess
import re
from random import randint
import random
from pathlib import Path
from pdf2image import convert_from_path
from PIL import Image, ImageFilter, ImageDraw
from PIL.ImageFilter import (
BLUR, CONTOUR, DETAIL, EDGE_ENHANCE, EDGE_ENHANCE_MORE,
EMBOSS, FIND_EDGES, SMOOTH, SMOOTH_MORE, SHARPEN
)
def convert_to(folder, source, timeout=None):
args = [libreoffice_exec(), '--headless', '--convert-to', 'pdf', '--outdir', folder, source]
process = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
filename = re.search('-> (.*?) using filter', process.stdout.decode())
return filename.group(1)
def libreoffice_exec():
# TODO: Provide support for more platforms
if sys.platform == 'darwin':
return '/Applications/LibreOffice.app/Contents/MacOS/soffice'
elif sys.platform == 'win32':
# you need to add libreoffice to path
return 'soffice.exe'
return 'libreoffice'
def create_photo(address, user):
month = datetime.now().month
info = get_info(user)
path_to_template = os.path.join(os.path.abspath('.'), 'data', user)
template_docx = os.path.join(path_to_template, 'new_template.docx')
if len(info) > 0:
if not os.path.exists(template_docx):
# print(f"Копируем шаблоны графиков в {user}")
template_in_user = shutil.copyfile('new_template.docx', template_docx)
path_for_photo = os.path.join(path_to_template, str(month), address)
photos = get_photo(address, user)
if len(photos) > 0:
# print(len(photos))
if len(photos) > 7:
open_document(template_docx, True, info, address, path_for_photo, user)
else:
open_document(template_docx, False, info, address, path_for_photo, user)
return path_for_photo
else:
print(f"Нет фотографий по адресу {address}")
def get_info(user):
path = os.path.join(os.path.abspath('.'), 'data', user, 'img_5.txt')
if not os.path.exists(path):
print(f"Нет файла с информацией у пользователя {user}")
return []
with open(path, 'r') as f:
fd = f.readlines()
return fd
def get_photo(address, email):
photos = []
month = datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
print(f"Нет фотографий по адресу: {address}")
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def open_document(filename, lift, info, address, path_for_photo, user):
# print(f"Наличие лифта по адресу: {address} - {lift}")
# print(f"Начинаем подготавливать график для {address}, шаблон {filename}")
row_for_sign = 0
document = Document(filename)
styles = document.styles
tables = document.tables
date_now = datetime.today()
first_date = tables[0].rows[1].cells[0].text
first_date = datetime.strptime(first_date, '%d.%m.%y')
if datetime.now().date()-timedelta(12) > first_date.date():
clear_table(tables, lift, styles, info)
row_for_sign = fill_table(tables)
document.save(filename)
else:
row_for_sign = fill_table(tables)
document.save(filename)
paragraphs = document.paragraphs
fill_info(paragraphs, styles, info, address, lift)
document.save(filename)
make_png_with_sign(filename, row_for_sign, lift, path_for_photo, user)
def make_png_with_sign(filename, row_for_sign, lift, path_for_photo, user):
# print(f"Создаем картинку из графика {filename}")
horizontal = 0
if lift:
start_vertical = 780
else:
start_vertical = 680
result = convert_to('.', filename, timeout=15)
pages = convert_from_path(result, 200)
pages[0].save('template.png', 'PNG')
img2 = Image.open('template.png')
img2 = img2.convert("RGBA")
datas = img2.getdata()
newData = []
for item in datas:
if item[0] == 255 and item[1] == 255 and item[2] == 255:
newData.append((255, 255, 255, 0))
else:
# if item[0] > 120:
# newData.append((105, 105, 105, 255))
# else:
newData.append(item)
img2.putdata(newData)
while (row_for_sign+1) > 0:
# if lift:
# horizontal = randint(1320, 1450)
# else:
horizontal = randint(1320, 1450)
vertical = start_vertical + ((row_for_sign) * 100)
# clear_background('my_sign.png')
sign = Image.open(os.path.join(os.path.abspath('.'), 'data', user, 'sign.png'))
width, height = sign.size
sign = sign.resize((int(width*1.5), int(height*1.5)))
# print(horizontal)
img2.paste(sign, (horizontal, vertical), sign) # vertical step = 150
row_for_sign -= 1
img2.save('template.png', "PNG")
add_image_to_image('template.png', 'fon2.png', path_for_photo)
def add_image_to_image(filename, filename_fon, dst_folder):
# print(f"Накладываем график {filename} на фон")
left = randint(0, 140)
upper = randint(0, 85)
w_rand = random.uniform(0.8, 1)
b_rand = random.uniform(0.5, 0.8)
try:
angle = 0
# img = img.filter(ImageFilter.GaussianBlur(radius=1))
background = Image.open(filename_fon)
foreground = Image.open(filename)
width, height = foreground.size
size = int(width*0.25), int(height*0.25)
width_b, height_b = background.size
foreground = foreground.resize(size).rotate(angle)
background.paste(foreground, (195, 290), foreground)
background = background.crop((left, upper, width_b*w_rand, height_b*w_rand))
background = background.filter(ImageFilter.GaussianBlur(radius=b_rand))
background.save(os.path.join(dst_folder, 'graph.jpg'))
# print(os.path.join(dst_folder, 'graph.jpg'))
# print(os.path.dirname(dst_folder))
except IOError:
print(sys.exc_info()[0])
pass
def delete_paragraph(paragraph):
p = paragraph._element
p.getparent().remove(p)
p._p = p._element = None
def clear_table(tables, lift, styles, info):
# print(f"Очищаем таблицу в графике")
fio = info[0].split('\n')[0]
count = 0
while count < len(tables[0].rows)-1:
date_to_add = datetime.now().date()+timedelta(count)
delete_paragraph(tables[0].rows[count+1].cells[0].paragraphs[0])
delete_paragraph(tables[0].rows[count+1].cells[2].paragraphs[0])
par = tables[0].rows[count+1].cells[0].add_paragraph()
par_fio = tables[0].rows[count+1].cells[2].add_paragraph()
p = par.add_run(datetime.strftime(date_to_add, '%d.%m.%y'))
p_fio = par_fio.add_run(fio)
par_fio.alignment = 1
p_fio.bold = True
p.bold = True
font = p.font
count += 1
def fill_table(tables):
# print("Заполняем таблицу в графике")
r_count = 0
count = 0
while count < len(tables[0].rows)-1:
date_in_cell = tables[0].rows[count+1].cells[0].text
date_in_cell = datetime.strptime(date_in_cell, '%d.%m.%y')
if date_in_cell.date() == datetime.now().date():
r_count = count
count += 1
return r_count
def fill_info(paragraphs, styles, info, address, lift):
# print(f"Заполняем шапку графика в {address}")
fio = info[0].split('\n')[0]
dol = info[1].split('\n')[0]
phone_number = info[2].split('\n')[0]
for paragraph in paragraphs:
name_style = paragraph.style.name
if 'по адресу:' in paragraph.text:
# p = paragraph.insert_paragraph_before()
style = styles[name_style]
style.font.size = Pt(14)
paragraph.text = ''
p = paragraph.add_run('по адресу: '+address[:-4]+' Подъезд № ' + address[-2])
p.bold = True
font = p.font
font.color.rgb = RGBColor(128, 128, 128)
paragraph.style = styles[name_style]
elif 'Должность:' in paragraph.text:
paragraph.text = ''
if lift:
text_for_perechen = 'Перечень обрабатываемых объектов:\n - Первые этажи подъездов: полы, дверные и оконные ручки, почтовые ящики, панели домофонов и кодовых замков, перила лестничных маршей\n - Лифтовые кабины: напольные покрытия, панели управления лифтами, лифтовые вентиляционные решетки'
else:
text_for_perechen = 'Перечень обрабатываемых объектов:\n - Первые этажи подъездов: полы, дверные и оконные ручки, почтовые ящики, панели домофонов и кодовых замков, перила лестничных маршей'
run = paragraph.add_run(' Должность: ' + dol +' Ф.И.О.: ' + fio + '\n тел: ' + phone_number + '\n' + text_for_perechen)
font = run.font

74
classes/desinfection.py Normal file
View File

@ -0,0 +1,74 @@
import classes.create_photo as cr_photo
import os
import time
def assign_task(session, headers, task, address):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
task_assign = session.put(url, headers=headers)
if task_assign.status_code == 200:
print(f'Получили задание по адресу: {address}')
complete_task(session, headers, task, address)
else:
print(f"Не смогли получить задание {address} {task['id']}!")
def complete_task(session, headers, task, address):
photo_path = os.path.join(cr_photo.create_photo(address, headers['uid']), 'graph.jpg')
id_task = task['id']
# questions = get_questions(id_task, session, headers)
question_components = []
yes_no = {"id": 23384, "position": 0, "answer": 77, "start_time": int(time.time())}
time.sleep(2)
yes_no.update({"end_time": int(time.time())})
question_components.append(yes_no)
if not os.path.exists(photo_path):
print(f"Нет фотографии графика по адресу {address}")
else:
img = send_image(session, headers, task['id'], photo_path)#photos.pop(0))
image_graph = {"id": 23385, "position": 1, "answer": [img['id']], "start_time": int(time.time())}
time.sleep(2)
image_graph.update({"end_time": int(time.time())})
question_components.append(image_graph)
questions = [{"id": 7956, "question_components": question_components, "position": 0}]
question_chains = [{"id": 7396, "questions": questions}]
ans = {"question_chains": question_chains}
session_t = session
session_t.headers.update(
{
'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
'x-platform': 'wb',
'accept': '*/*'
}
)
complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
if complete.status_code == 200:
print(f'Задание по адресу {address} выполнено успешно!\n\n')
else:
print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}\n\n')
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
# print(files)
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img
# def get_questions(id_task, session, headers):
# url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
# questions = session.get(url, headers=headers)
# json_quest = questions.json()
# chain = json_quest['question_chains'][0].get('questions')
# print(chain[0].get('question_components'))
# return json_quest['selected_chains']

137
classes/proverka.py Normal file
View File

@ -0,0 +1,137 @@
import os
import datetime
import time
def assign_task(session, headers, task, address):
# print('assign_task')
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
task_assign = session.put(url, headers=headers)
if task_assign.status_code == 200:
print(f'Получили задание по адресу: {address}')
complete_task(session, headers, task, address)
else:
print(f"Не смогли получить задание {address} {task['id']}!")
def complete_task(session, headers, task, address):
photos = get_photo(address, headers['uid'])
if len(photos) > 0:
photos.sort()
"""В папке лежит фото графика, которое отправляется из модуля des, поэтому убираем отсюда"""
if len(photos) == 7 or len(photos) == 9:
photos.pop()
id_task = task['id']
quest = get_questions(id_task, session, headers)
length_question = get_length_questions(quest)
prepared_answers = []
if len(photos) == length_question:
prepared_answers = prepare_answer(quest, photos, session, task, headers)
ans = {"question_chains": prepared_answers}
# print(ans)
session_t = session
session_t.headers.update(
{
'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
'x-platform': 'wb',
'accept': '*/*'
}
)
complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
if complete.status_code == 200:
print(f'Задание по адресу {address} выполнено успешно!\n\n')
else:
print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}\n\n')
else:
print(f"\n\n\nНесоответствие количества вопросов и фотографий по адресу {address}!!! \
Необходимо проверить!!!\n\n\n")
# print(len(quest)) 9 and 13
def get_length_questions(quest):
count = 0
for main_chain in quest:
for second_chain in main_chain['questions']:
for third_chain in second_chain['question_components']:
question_type = third_chain['type']
if question_type == "gallery":
count = count + 1
return count
def prepare_answer(quest, photos, session, task, headers):
questions = []
question_components= []
question_chains = []
for main_chain in quest:
for second_chain in main_chain['questions']:
for third_chain in second_chain['question_components']:
question_type = third_chain['type']
if question_type == 'list':
data = {"id": third_chain['id'], "answer": 77, "answer_status": None, "start_time": int(time.time())}
time.sleep(2)
data.update({"end_time": int(time.time())})
question_components.append(data)
elif question_type == "geo":
data = {"id": third_chain['id'], "answer": {"location": task['coord']}, "answer_status": None, \
"start_time": int(time.time())}
time.sleep(2)
data.update({"end_time": int(time.time())})
question_components.append(data)
elif question_type == "gallery":
img = send_image(session, headers, task['id'], photos.pop(0))#photos.pop(0))
data = {"id": third_chain['id'], "answer": [img['id']], "answer_status": None, \
"start_time": int(time.time())}
time.sleep(2)
data.update({"end_time": int(time.time())})
question_components.append(data)
if len(question_components) > 0:
main_data = {
"id": second_chain['id'],
"question_components": question_components
}
questions.append(main_data)
question_components = []
if len(questions) > 0:
q_dict = {
"id": main_chain['id'],
"questions" : questions
}
question_chains.append(q_dict)
questions = []
return question_chains
def get_photo(address, email):
photos = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def get_questions(id_task, session, headers):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
questions = session.get(url, headers=headers)
json_quest = questions.json()
return json_quest['selected_chains']
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img

27
classes/users.py Normal file
View File

@ -0,0 +1,27 @@
import os
class Users:
def __init__(self):
self.data = []
self.path = os.path.join(os.path.abspath('.'), 'data')
def get_users(self):
self.logins = []
main_dir = os.walk(self.path)
for d, dirs, files in main_dir:
self.logins.append(dirs)
return self.logins[0]
def get_passwords(self):
if not os.path.exists(self.path):
return "Папка data не существует. Создайте папку с необходимыми параметрами и запустите программу снова!"
self.emails = self.get_users()
if len(self.emails) > 0:
for user in self.emails:
with open(os.path.join(self.path, user, 'pass.txt'), 'r') as f:
log_pass = {'email': user, 'password': f.readline().split('\n')[0]}
self.data.append(log_pass)
else:
return "Нет ни одного пользователя! Заполните папку и запустите программу снова!"
return self.data

View File

@ -0,0 +1,3 @@
Драченко Оксана Владимировна
начальник РЭУ
8(985)574-77-92

Binary file not shown.

View File

@ -0,0 +1 @@
kn2512

264
des.py Normal file
View File

@ -0,0 +1,264 @@
import requests
import random
import string
# import json
import datetime
import time
import os
from classes.users import Users
import classes.desinfection as des
import classes.proverka as prov
def get_questions(id_task, session, headers):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
questions = session.get(url, headers=headers)
json_quest = questions.json()
return json_quest['selected_chains']
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
# print(files)
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img
# def complete_task(session, headers, task, address):
# photos = get_photo(address, headers['uid'])
# if len(photos) > 0:
# photos.sort()
# print(task['id'])
# id_task = task['id']
# questions = get_questions(id_task, session, headers)
# data = []
# for answers in questions:
# answer_chain = {
# 'id': answers['id'],
# 'questions': [] # answers['questions']
# }
# if answer_chain['id'] == 6146: # Цепочка без ответов
# continue
# for components in answers['questions']:
# comp = []
# q_c = {
# 'id': components['id'],
# 'question_components': []
# }
# if q_c['id'] == 6560: # Вопрос без ответа
# continue
# comp.append(q_c)
# answer_chain.update({'questions': comp})
# data.append(answer_chain)
# if len(data) == len(photos):
# for chain in data:
# for item in chain['questions']:
# if len(data) == len(photos):
# photo_from_photos = photos.pop(0)
# img = send_image(session, headers, task['id'], photo_from_photos) # photos.pop(0))
# # count_send_photo = 0
# # while img['id'] is None:
# # count_send_photo += 1
# # print("Попытка № " + str(count_send_photo) + "отправить фото" + str(photo_from_photos))
# # img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# first = {"id": 18850, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# first.update({'end_time': int(time.time())})
# second = {"id": 18851, "answer": {"location": task['coord']}, 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# second.update({'end_time': int(time.time())})
# item['question_components'].append(first)
# item['question_components'].append(second)
# else:
# # img = send_image(session, headers, task['id'], photos.pop(0))
# photo_from_photos = photos.pop(0)
# img = send_image(session, headers, task['id'], photo_from_photos) # photos.pop(0))
# # count_send_photo = 0
# # while img['id'] is None:
# # count_send_photo += 1
# # print("Попытка № " + str(count_send_photo) + " отправить фото" + str(photo_from_photos))
# # img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# first = {"id": 18856, "answer": 77, 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# first.update({'end_time': int(time.time())})
# second = {"id": 18857, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
# time.sleep(2)
# second.update({'end_time': int(time.time())})
# item['question_components'].append(first)
# item['question_components'].append(second)
# ans = {"question_chains": data}
# # print(ans)
# session_t = session
# session_t.headers.update(
# {
# 'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
# 'x-platform': 'wb',
# 'accept': '*/*'
# }
# )
# complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
# if complete.status_code == 200:
# print(f'Задание по адресу {address} выполнено успешно!')
# else:
# print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}')
# else:
# print(f"Несоответствие количества вопросов и количества фотографий по адресу: {address}")
# # send_execution(task,session, headers)
def get_photo(address, email):
photos = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def get_addresses(email):
addresses = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data')
path = os.walk(os.path.join(main_path, email, str(month)))
if not os.path.exists(os.path.join(main_path, email, str(month))):
return f"Не найдена папка с месяцем у пользователя {email}! Проверьте структуру каталога!"
for d, dirs, files in path:
addresses.append(dirs)
if len(addresses[0]) > 0:
return addresses[0]
else:
return f"Ни одного адреса для пользователя {email} не найдено! Добавьте фото и запустите программу снова!"
# def assign_task(session, headers, task, address):
# url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
# task_assign = session.put(url, headers=headers)
# if task_assign.status_code == 200:
# print(f'Получили задани по адресу: {address}')
# complete_task(session, headers, task, address)
# else:
# print(f"Не смогли получить задание {address} {task['id']}!")
def get_task(session, headers):
"""Получаем все доступные задания. Если в переменную search добавить адрес,
то будет искать этот адрес. Он попадет в
json_tasks['executions_available'],
так что надо понять откуда берем задания. Отсюда можно возвращать
все параметры задания:
1 координаты
2 адрес
3 подъезд
4 id_задания"""
desinfection = 'Дезинфекция подъездов (МОП)'
proverka = 'Систематическая проверка подъездов МКД'
# Получаем адреса
addresses = get_addresses(headers['uid'])
# Если адреса получили
if isinstance(addresses, list):
session_t = session
session_t.headers.update(
{'referer': 'https://knd.mosreg.ru/executions',
'x-platform': 'wb',
'accept': '*/*'
}
)
for address in addresses:
# change '-' for '/'
# need split enterance from address
addr = address.replace("Чаиковского", "Чайковского")
addr = address.replace("Маиданово", "Майданово")
search = addr.replace("_", "/")[:-4]
# print(search)
data = {'search': search}
tasks = session_t.post('https://knd.mosreg.ru//api/v1/executions',
headers=headers,
data=data
)
json_tasks = tasks.json()
# get_t = False
# Если есть доступное задание с таким адресом
exec_assigned = json_tasks['executions_assigned'] # Уже полученные задания
exec_available = json_tasks['executions_available'] # Еще не принятые задания
# print(address)
if len(exec_available) > 0:
for n in exec_available:
coord = n['coord']
enterances = n['tasks']
for execution in enterances:
address_from_tasks = execution['dimensions'][0].get('entity_value_name')
name_from_tasks = execution['name']
enterance = execution['dimensions'][2].get('entity_value_name')
if address_from_tasks == 'Клин г, ' + addr.replace("_", "/")[:-4] and \
enterance[-1] == address[-2]:
print('\n')
print(name_from_tasks)
print('\n')
task = {
'id': execution['execution_id'],
'address': execution['dimensions'][0].get('entity_value_name'),
'enterance': execution['dimensions'][2].get('entity_value_name'),
'coord': coord
}
if name_from_tasks == desinfection:
des.assign_task(session, headers, task, address)
elif name_from_tasks == proverka:
pass
# prov.assign_task(session, headers, task, address)
else:
pass
else:
print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}")
# Если адреса не получили выводим ошибку
else:
print(addresses)
def main():
users = Users()
logins = users.get_passwords()
if isinstance(logins, list):
url = 'https://knd.mosreg.ru//api/v1/auth/sign_in'
for user in logins:
session = requests.Session()
login = {
'email': user['email'],
'password': user['password']
}
response = session.post(url, data=login)
headers = {
'client': response.headers.get('client'),
'Access-token': response.headers.get('Access-token'),
'uid': response.headers.get('uid'),
}
if response.status_code == 200:
get_task(session, headers)
else:
print(f"Отказ в авторизации для пользователя {user['email']}!")
time.sleep(1)
else:
print(users.get_passwords())
if __name__ == "__main__":
main()
k = input("Press ENTER for exit")
def random_string(stringLength):
letters = string.ascii_letters
digits = string.digits
return ''.join(random.choice(letters+digits) for i in range(stringLength))

263
knd_bot.py Normal file
View File

@ -0,0 +1,263 @@
import requests
import random
import string
# import json
import datetime
import time
import os
from classes.users import Users
import classes.desinfection as des
import classes.proverka as prov
def get_questions(id_task, session, headers):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
questions = session.get(url, headers=headers)
json_quest = questions.json()
return json_quest['selected_chains']
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
# print(files)
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img
def complete_task(session, headers, task, address):
photos = get_photo(address, headers['uid'])
if len(photos) > 0:
photos.sort()
print(task['id'])
id_task = task['id']
questions = get_questions(id_task, session, headers)
data = []
for answers in questions:
answer_chain = {
'id': answers['id'],
'questions': [] # answers['questions']
}
if answer_chain['id'] == 6146: # Цепочка без ответов
continue
for components in answers['questions']:
comp = []
q_c = {
'id': components['id'],
'question_components': []
}
if q_c['id'] == 6560: # Вопрос без ответа
continue
comp.append(q_c)
answer_chain.update({'questions': comp})
data.append(answer_chain)
if len(data) == len(photos):
for chain in data:
for item in chain['questions']:
if len(data) == len(photos):
photo_from_photos = photos.pop(0)
img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# count_send_photo = 0
# while img['id'] is None:
# count_send_photo += 1
# print("Попытка № " + str(count_send_photo) + "отправить фото" + str(photo_from_photos))
# img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
first = {"id": 18850, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
first.update({'end_time': int(time.time())})
second = {"id": 18851, "answer": {"location": task['coord']}, 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
second.update({'end_time': int(time.time())})
item['question_components'].append(first)
item['question_components'].append(second)
else:
#img = send_image(session, headers, task['id'], photos.pop(0))
photo_from_photos = photos.pop(0)
img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# count_send_photo = 0
# while img['id'] is None:
# count_send_photo += 1
# print("Попытка № " + str(count_send_photo) + " отправить фото" + str(photo_from_photos))
# img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
first = {"id": 18856, "answer": 77, 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
first.update({'end_time': int(time.time())})
second = {"id": 18857, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
second.update({'end_time': int(time.time())})
item['question_components'].append(first)
item['question_components'].append(second)
ans = {"question_chains": data}
# print(ans)
session_t = session
session_t.headers.update(
{
'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
'x-platform': 'wb',
'accept': '*/*'
}
)
complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
if complete.status_code == 200:
print(f'Задание по адресу {address} выполнено успешно!')
else:
print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}')
else:
print(f"Несоответствие количества вопросов и количества фотографий по адресу: {address}")
# send_execution(task,session, headers)
def get_photo(address, email):
photos = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def get_addresses(email):
addresses = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data')
path = os.walk(os.path.join(main_path, email, str(month)))
if not os.path.exists(os.path.join(main_path, email, str(month))):
return f"Не найдена папка с месяцем у пользователя {email}! Проверьте структуру каталога!"
for d, dirs, files in path:
addresses.append(dirs)
if len(addresses[0]) > 0:
return addresses[0]
else:
return f"Ни одного адреса для пользователя {email} не найдено! Добавьте фото и запустите программу снова!"
# def assign_task(session, headers, task, address):
# url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
# task_assign = session.put(url, headers=headers)
# if task_assign.status_code == 200:
# print(f'Получили задани по адресу: {address}')
# complete_task(session, headers, task, address)
# else:
# print(f"Не смогли получить задание {address} {task['id']}!")
def get_task(session, headers):
"""Получаем все доступные задания. Если в переменную search добавить адрес,
то будет искать этот адрес. Он попадет в
json_tasks['executions_available'],
так что надо понять откуда берем задания. Отсюда можно возвращать
все параметры задания:
1 координаты
2 адрес
3 подъезд
4 id_задания"""
desinfection = 'Дезинфекция подъездов (МОП)'
proverka = 'Систематическая проверка подъездов МКД'
# Получаем адреса
addresses = get_addresses(headers['uid'])
# Если адреса получили
if isinstance(addresses, list):
session_t = session
session_t.headers.update(
{'referer': 'https://knd.mosreg.ru/executions',
'x-platform': 'wb',
'accept': '*/*'
}
)
for address in addresses:
# change '-' for '/'
# need split enterance from address
addr = address.replace("Чаиковского", "Чайковского")
addr = address.replace("Маиданово", "Майданово")
search = addr.replace("_", "/")[:-4]
# print(search)
data = {'search': search}
tasks = session_t.post('https://knd.mosreg.ru//api/v1/executions',
headers=headers,
data=data
)
json_tasks = tasks.json()
get_t = False
# Если есть доступное задание с таким адресом
exec_assigned = json_tasks['executions_assigned'] # Уже полученные задания
exec_available = json_tasks['executions_available'] # Еще не принятые задания
# print(address)
if len(exec_available) > 0:
for n in exec_available:
coord = n['coord']
enterances = n['tasks']
for execution in enterances:
address_from_tasks = execution['dimensions'][0].get('entity_value_name')
name_from_tasks = execution['name']
enterance = execution['dimensions'][2].get('entity_value_name')
if address_from_tasks == 'Клин г, '+ addr.replace("_", "/")[:-4] and \
enterance[-1] == address[-2]:
print('\n')
print(name_from_tasks)
print('\n')
task = {
'id': execution['execution_id'],
'address': execution['dimensions'][0].get('entity_value_name'),
'enterance': execution['dimensions'][2].get('entity_value_name'),
'coord': coord
}
if name_from_tasks == desinfection:
des.assign_task(session, headers, task, address)
elif name_from_tasks == proverka:
prov.assign_task(session, headers, task, address)
else:
pass
else:
print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}")
# Если адреса не получили выводим ошибку
else:
print(addresses)
def main():
users = Users()
logins = users.get_passwords()
if isinstance(logins, list):
url = 'https://knd.mosreg.ru//api/v1/auth/sign_in'
for user in logins:
session = requests.Session()
login = {
'email': user['email'],
'password': user['password']
}
response = session.post(url, data=login)
headers = {
'client': response.headers.get('client'),
'Access-token': response.headers.get('Access-token'),
'uid': response.headers.get('uid'),
}
if response.status_code == 200:
get_task(session, headers)
else:
print(f"Отказ в авторизации для пользователя {user['email']}!")
time.sleep(1)
else:
print(users.get_passwords())
if __name__ == "__main__":
main()
k = input("Press ENTER for exit")
def random_string(stringLength):
letters = string.ascii_letters
digits = string.digits
return ''.join(random.choice(letters+digits) for i in range(stringLength))

268
knd_bot_assigned.py Normal file
View File

@ -0,0 +1,268 @@
import requests
import random
import string
# import json
import datetime
import time
import os
from classes.users import Users
def get_questions(id_task, session, headers):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(id_task)+'/questions'
questions = session.get(url, headers=headers)
json_quest = questions.json()
return json_quest['selected_chains']
def send_image(session, headers, url, photo):
"""Отправляем фото на сервер. Сервер должен вернуть id и url."""
files = {
'image': (photo[-5:], open(photo, 'rb')),
}
# print(files)
img = session.post('https://knd.mosreg.ru//api/v1/executions/'+str(url)+'/images', headers=headers, files=files)
if img.status_code == 200:
print(f"Фото {photo} успешно отправлено")
return img.json()
else:
print(f"Не удалось загрузить фото на сервер по заданию {url}, статус ошибки {img.status_code}! Повторная попытка!")
second_img = send_image(session, headers, url, photo)
return second_img
def complete_task(session, headers, task, address):
photos = get_photo(address, headers['uid'])
if len(photos) > 0:
photos.sort()
print(task['id'])
id_task = task['id']
questions = get_questions(id_task, session, headers)
data = []
for answers in questions:
answer_chain = {
'id': answers['id'],
'questions': []#answers['questions']
}
if answer_chain['id'] == 6146: # Цепочка без ответов
continue
for components in answers['questions']:
comp = []
q_c = {
'id': components['id'],
'question_components': []
}
if q_c['id'] == 6560: # Вопрос без ответа
continue
comp.append(q_c)
answer_chain.update({'questions': comp})
data.append(answer_chain)
if len(data) == len(photos):
for chain in data:
for item in chain['questions']:
if len(data) == len(photos):
photo_from_photos = photos.pop(0)
img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# count_send_photo = 0
# while img['id'] is None:
# count_send_photo += 1
# print("Попытка № " + str(count_send_photo) + "отправить фото" + str(photo_from_photos))
# img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
first = {"id": 18850, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
first.update({'end_time': int(time.time())})
second = {"id": 18851, "answer": {"location": task['coord']}, 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
second.update({'end_time': int(time.time())})
item['question_components'].append(first)
item['question_components'].append(second)
else:
#img = send_image(session, headers, task['id'], photos.pop(0))
photo_from_photos = photos.pop(0)
img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
# count_send_photo = 0
# while img['id'] is None:
# count_send_photo += 1
# print("Попытка № " + str(count_send_photo) + " отправить фото" + str(photo_from_photos))
# img = send_image(session, headers, task['id'], photo_from_photos)#photos.pop(0))
first = {"id": 18856, "answer": 77, 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
first.update({'end_time': int(time.time())})
second = {"id": 18857, "answer": [img['id']], 'answer_status': None, 'start_time': int(time.time())}
time.sleep(2)
second.update({'end_time': int(time.time())})
item['question_components'].append(first)
item['question_components'].append(second)
ans = {"question_chains": data}
# print(ans)
session_t = session
session_t.headers.update(
{
'referer': 'https://knd.mosreg.ru/executions'+str(task['id'])+'/solve',
'x-platform': 'wb',
'accept': '*/*'
}
)
complete = session_t.post('https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/answers', headers=headers, json=ans)
if complete.status_code == 200:
print(f'Задание по адресу {address} выполнено успешно!')
else:
print(f'Задание по адресу {address} не выполненно! Статус ошибки {complete.status_code}')
else:
print(f"Несоответствие количества вопросов и количества фотографий по адресу: {address}")
# send_execution(task,session, headers)
def get_photo(address, email):
photos = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data', email, str(month), address)
if not os.path.exists(main_path):
return photos
else:
for images in os.listdir(main_path):
if images.endswith(".jpg") or images.endswith(".jpeg") or images.endswith(".JPG"):
photos.append(main_path + '/' + images)
return photos
def get_addresses(email):
addresses = []
month = datetime.datetime.now().month
main_path = os.path.join(os.path.abspath('.'), 'data')
path = os.walk(os.path.join(main_path, email, str(month)))
if not os.path.exists(os.path.join(main_path, email, str(month))):
return f"Не найдена папка с месяцем у пользователя {email}! Проверьте структуру каталога!"
for d, dirs, files in path:
addresses.append(dirs)
if len(addresses[0]) > 0:
return addresses[0]
else:
return f"Ни одного адреса для пользователя {email} не найдено! Добавьте фото и запустите программу снова!"
def assign_task(session, headers, task, address):
url = 'https://knd.mosreg.ru//api/v1/executions/'+str(task['id'])+'/assign'
task_assign = session.put(url, headers=headers)
if task_assign.status_code == 200:
print(f'Получили задани по адресу: {address}')
complete_task(session, headers, task, address)
else:
print(f"Не смогли получить задание {address} {task['id']}!")
def get_task(session, headers):
"""Получаем все доступные задания. Если в переменную search добавить адрес,
то будет искать этот адрес. Он попадет в
json_tasks['executions_available'],
так что надо понять откуда берем задания. Отсюда можно возвращать
все параметры задания:
1 координаты
2 адрес
3 подъезд
4 id_задания"""
# Получаем адреса
addresses = get_addresses(headers['uid'])
# Если адреса получили
if isinstance(addresses, list):
session_t = session
session_t.headers.update(
{'referer': 'https://knd.mosreg.ru/executions',
'x-platform': 'wb',
'accept': '*/*'
}
)
for address in addresses:
# change '-' for '/'
# need split enterance from address
search = address.replace("_", "/")[:-4]
# print(search)
data = {'search': search}
tasks = session_t.post('https://knd.mosreg.ru//api/v1/executions',
headers=headers,
data=data
)
json_tasks = tasks.json()
get_t = False
# Если есть доступное задание с таким адресом
exec_assigned = json_tasks['executions_assigned']
exec_available = json_tasks['executions_available']
if len(exec_assigned) > 0:
for n in exec_assigned:
coord = n['coord']
enterances = n['tasks']
for execution in enterances:
address_from_tasks = execution['dimensions'][0].get('entity_value_name')
# print(address_from_tasks)
# print('Клин г, '+address[:-4])
enterance = execution['dimensions'][2].get('entity_value_name')
# print(enterance)
# print(address[-2])
if address_from_tasks == 'Клин г, '+address.replace("_", "/")[:-4] and \
enterance[-1] == address[-2]:
print(f'Начали выполнение задания по адресу: {address}')
task = {
'id': execution['execution_id'],
'address': execution['dimensions'][0].get('entity_value_name'),
'enterance': execution['dimensions'][2].get('entity_value_name'),
'coord': coord
}
# Принимаем задание
assign_task(session, headers, task, address)
get_t = True
else:
print(address_from_tasks + f" не соответствует адресу: {address}")
if not get_t:
print(f"Нет доступных заданий для по адресу: {address}!")
# Если нет задания с таким адресом
else:
print(f"Нет доступных заданий для пользователя {headers['uid']} по адресу: {address}")
# Если адреса не получили выводим ошибку
else:
print(addresses)
def main():
users = Users()
logins = users.get_passwords()
if isinstance(logins, list):
url = 'https://knd.mosreg.ru//api/v1/auth/sign_in'
for user in logins:
session = requests.Session()
login = {
'email': user['email'],
'password': user['password']
}
response = session.post(url, data=login)
headers = {
'client': response.headers.get('client'),
'Access-token': response.headers.get('Access-token'),
'uid': response.headers.get('uid'),
}
if response.status_code == 200:
# task = {
# 'id': 23799105,
# 'address': 'Решетниково рп, ОПМС-1 проезд, 13',
# 'enterance': 6,
# 'coord': [56.442503, 36.5596493]
# }
# address = 'Решетниково рп, ОПМС-1 проезд, 13, 6п'
# assign_task(session, headers, task, address)
# break
get_task(session, headers)
else:
print(f"Отказ в авторизации для пользователя {user['email']}!")
time.sleep(1)
else:
print(users.get_passwords())
if __name__ == "__main__":
main()
k = input("Press ENTER for exit")
def random_string(stringLength):
letters = string.ascii_letters
digits = string.digits
return ''.join(random.choice(letters+digits) for i in range(stringLength))

BIN
new_template.docx Normal file

Binary file not shown.

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
certifi==2020.4.5.2
chardet==3.0.4
docx==0.2.4
geographiclib==1.50
idna==2.9
lxml==4.5.2
pdf2image==1.14.0
Pillow==7.2.0
python-docx==0.8.10
requests==2.23.0
requests-toolbelt==0.9.1
six==1.15.0
urllib3==1.25.9