-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
138 lines (112 loc) · 4.16 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import cfg
import os
import re
import requests
from werkzeug.utils import secure_filename
import timeit
import time
import openpyxl
import urllib.request
from tqdm import tqdm
from PIL import Image, ImageDraw
# 检查是不是允许上传的文件类型
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in cfg.allowed_extenstions
# 转存上传的图片文件
def save_uploaded_file(raw_files):
for file in raw_files:
if file:
filename = file.filename
if not allowed_file(filename):
continue
filename = secure_filename(filename)
img_path = os.path.join(cfg.image_preview_folder, filename)
file.save(img_path)
# 转发批量文件上传请求到计算识别服务器
def batch_upload(headers, file_data):
start = timeit.default_timer()
response = requests.post(url=cfg.api_qcc_batch_detection,
headers=headers,
data=file_data)
end = timeit.default_timer()
delay = end-start
print('Eclapsed: %s second(s)' % delay)
return response, delay
# 将批量识别的结果保存到表格文件
def write_batch_detection_result(collections):
# 以当前时间作为表格文件名
now = time.strftime("%Y-%m-%d-%H-%M-%S",time.localtime(time.time()))
# 获得表格文件输出路径
output_filepath = os.path.join(cfg.table_output_folder, now + '.xlsx')
# 创建Excel工作本
book = openpyxl.Workbook()
# 创建组
sheet = book.create_sheet('Results', 0)
sheet.cell(1, 1, 'Image')
sheet.cell(1, 2, 'Shop name')
# 将结果逐条写入表格中
for i in range(len(collections)):
sheet.cell(i + 1 + 1, 1, collections[i]['filename'])
sheet.cell(i + 1 + 1, 2, collections[i]['enterprise'])
# 保存表格文件到路径
book.save(output_filepath)
return output_filepath
# 在图片上绘制结果框
def draw_polygon_on_image(img_path, polygon):
image = Image.open(img_path).convert('RGB') # 转换色彩空间
draw = ImageDraw.Draw(image)
draw.line([tuple(polygon[0]),
tuple(polygon[1]),
tuple(polygon[2]),
tuple(polygon[3]),
tuple(polygon[0])], width=cfg.answer_line_width, fill=cfg.answer_line_color)
image.save(img_path)
# 排序用比较函数,先比较文件名中的数字大小,再比较文件名长度
# <<需要加强!>>
def sort_by_filename(elem):
digit_str = re.sub(r'\D', '', elem['filename'])
if len(digit_str) == 0:
return len(elem['filename'])
return int(digit_str)
def process_detection_result(json_data):
collections = []
items = json_data.items()
for key, value in items:
if value is None:
value = ''
detection_result = value
success = detection_result['success']
enterprise = detection_result['enterprise']
polygon = detection_result['polygon']
filename = key
# 处理文件名,去除根目录名
slash_index = filename.find('/')
if slash_index != -1:
filename = filename[slash_index+1:]
# 识别失败
if success == 0:
enterprise = '<failed>'
img_filename = secure_filename(key)
img_path = os.path.join(cfg.image_preview_folder, img_filename)
if polygon and len(polygon) > 0:
draw_polygon_on_image(img_path, polygon)
collection = {
"filename": filename,
"enterprise": enterprise,
"polygon": polygon,
"img_preview": img_path,
}
collections.append(collection)
if len(collections) > 1:
collections.sort(key=sort_by_filename)
return collections
class DownloadProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def download_url(url, output_path):
with DownloadProgressBar(unit='B', unit_scale=True,
miniters=1, desc=url.split('/')[-1]) as t:
urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to)