-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfens2cdb.py
193 lines (181 loc) · 6.37 KB
/
fens2cdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Script to bulk-evaluate FENs with chessdb.cn.
"""
import argparse, asyncio, sys, time, cdblib
class fens2cdb:
def __init__(
self,
filename,
output,
shortFormat,
quiet,
enqueue,
concurrency,
user,
suppressErrors,
):
self.input = filename
self.lines = []
self.scored = 0
with cdblib.open_file_rt(filename) as f:
for line in f:
line = line.strip()
if line:
self.lines.append(line)
if not line.startswith("#"): # ignore comments
self.scored += 1
if output:
self.output = open(output, "w")
self.display = sys.stdout
else:
self.output = sys.stdout
self.display = sys.stderr
if quiet:
self.display = None
if self.display:
print(
f"Read {self.scored} FENs from file {self.input}.",
file=self.display,
flush=True,
)
self.shortFormat = shortFormat
self.enqueue = enqueue
self.concurrency = concurrency
self.cdb = cdblib.cdbAPI(concurrency, user, not suppressErrors)
self.unknown = cdblib.AtomicInteger()
async def parse_all(self, batchSize=None):
if self.display:
print(
f"Started parsing the FENs with concurrency {self.concurrency}"
+ (" ..." if batchSize == None else f" and batch size {batchSize} ..."),
file=self.display,
flush=True,
)
if batchSize is None:
batchSize = len(self.lines)
self.tic = time.time()
for i in range(0, len(self.lines), batchSize):
tasks = []
for line in self.lines[i : i + batchSize]:
tasks.append(asyncio.create_task(self.parse_single_line(line)))
for parse_line in tasks:
print(await parse_line, file=self.output)
if self.display:
elapsed = time.time() - self.tic
print(
f"Done. Scored {self.scored} FENs in {elapsed:.1f}s.", file=self.display
)
if self.unknown.get():
print(
f"The file {self.input} contained {self.unknown.get()} new chessdb.cn positions.",
file=self.display,
)
if self.enqueue == 0:
print(
"Rerunning the script after a short break should provide their evals.",
file=self.display,
)
elif self.enqueue == 1:
print(
"They have now been queued for analysis.",
file=self.display,
)
else:
print(
"They have been queued for analysis, and their evals have been obtained.",
file=self.display,
)
async def parse_single_line(self, line):
if line.startswith("#"): # ignore comments
return line
fen = " ".join(line.split()[:4]) # cdb ignores move counters anyway
r = await self.cdb.queryscore(fen)
score = cdblib.json2eval(r)
if r.get("status") == "unknown" and score == "":
self.unknown.inc()
timeout = 5
while self.enqueue and r["status"] == "unknown":
r = await self.cdb.queue(fen)
if self.enqueue >= 2:
await asyncio.sleep(timeout)
r = await self.cdb.queryscore(fen)
score = cdblib.json2eval(r)
if timeout < 120:
timeout = min(timeout * 1.5, 120)
if self.shortFormat:
if score == "mated":
score = "#"
elif type(score) != int:
_, M, ply = score.partition("M")
if M == "" or not ply.isnumeric():
score = ""
elif score != "":
if "ply" in r:
score = f"{score}, ply: {r['ply']}"
score = f"cdb eval: {score}"
return f"{line}{' ;' if line[-1] != ';' else ''} {score};"
async def main():
parser = argparse.ArgumentParser(
description='A simple script to request evals from chessdb.cn for a list of FENs stored in a file. The script will add "; EVALSTRING;" to every line containing a FEN. Lines beginning with "#" are ignored, as well as any text after the first four fields of each FEN.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input", help="source filename with FENs (w/ or w/o move counters)"
)
parser.add_argument("output", nargs="?", help="optional destination filename")
parser.add_argument(
"--shortFormat",
action="store_true",
help='EVALSTRING will be just a number, or an "M"-ply mate score, or "#" for checkmate, or "".',
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress all unnecessary output to the screen.",
)
parser.add_argument(
"-e",
"--enqueue",
action="count",
default=0,
help="-e queues unknown positions once, -ee until an eval comes back.",
)
parser.add_argument(
"-c",
"--concurrency",
help="Maximum concurrency of requests to cdb.",
type=int,
default=16,
)
parser.add_argument(
"-b",
"--batchSize",
help="Number of FENs processed in parallel. Small values guarantee more responsive output, large values give faster turnaround.",
type=int,
default=None,
)
parser.add_argument(
"-u",
"--user",
help="Add this username to the http user-agent header.",
)
parser.add_argument(
"-s",
"--suppressErrors",
action="store_true",
help="Suppress error messages from cdblib.",
)
args = parser.parse_args()
f2c = fens2cdb(
args.input,
args.output,
args.shortFormat,
args.quiet,
args.enqueue,
args.concurrency,
args.user,
args.suppressErrors,
)
await f2c.parse_all(args.batchSize)
if __name__ == "__main__":
asyncio.run(main())