Skip to content
Snippets Groups Projects

proof-state recorder tweaks and fixes

Merged Björn Brandenburg requested to merge bbb/prosa:proof-state-rec-fixes into master
1 file
+ 87
12
Compare changes
  • Side-by-side
  • Inline
@@ -27,6 +27,7 @@ import argparse
import sys
import os
import re
import time
from subprocess import Popen, PIPE
from select import select
@@ -56,6 +57,12 @@ def statement_end_offsets(opts, src):
else:
return False
def prev_is(i, c):
if i > 0:
return src[i - 1] == c
else:
return False
def next_is_whitespace(i):
if i + 1 < len(src):
return src[i + 1].isspace()
@@ -140,10 +147,15 @@ def statement_end_offsets(opts, src):
# look for closing braces -- this is a brittle heuristic, but
# we need to catch sub-proofs because coqtop in emacs mode
# produces a prompt every time we enter a sub-proof
elif not in_comment and in_proof and cur_is(i, '}') \
and next_is_whitespace(i) and prev_is_whitespace(i):
yield i + 1
last = i + 1
elif not in_comment and in_proof and cur_is(i, '}') and \
(next_is_whitespace(i) or next_is (i, '}')) and \
(prev_is_whitespace(i) or prev_is(i, '}')):
if next_is_whitespace(i):
yield i + 1
last = i + 1
else:
yield i
last = i
# similarly, look for opening braces
elif not in_comment and in_proof and cur_is(i, '{') \
and next_is_whitespace(i) and prev_is_whitespace(i):
@@ -198,6 +210,8 @@ def read_from_pipe(pipe, timeout, seen_enough = lambda _: False, expect_timeout=
output += data
return output.decode("utf-8")
COQ_PROMPT_RE = re.compile(r"<prompt>\S* < ([0-9]+) ")
def wait_for_prompt(opts, pipe, timeout):
seen_enough = lambda output: output.endswith(END_OF_PROMPT_BYTES)
output = read_from_pipe(pipe, timeout, seen_enough)
@@ -206,18 +220,23 @@ def wait_for_prompt(opts, pipe, timeout):
more = read_from_pipe(pipe, 0, seen_enough, expect_timeout=True)
output += more
if more:
print("unexpected:", more)
print("Unexpected coqtop output:", more)
assert False # we lost sync; this should not be happening
else:
break
# check for interaction number in output
m = COQ_PROMPT_RE.search(output)
prompt_number = int(m.group(1)) if m else -1
# remove any prompts; we don't want to record those
if output.endswith(END_OF_PROMPT) and not opts.verbose:
idx = output.find(START_OF_PROMPT)
assert not START_OF_PROMPT in output[:idx] # only one prompt expected
return output[:idx]
if START_OF_PROMPT in output[idx+1:]:
print("Unexpected number of prompts in coqtop output: \n", output)
assert False # only one prompt expected
return (prompt_number, output[:idx])
else:
return output
return (prompt_number, output)
INFO_MSG_PATTERN = re.compile(
r"""
@@ -235,7 +254,7 @@ def extract_info_messages(output):
def wait_for_coqtop(opts, coqtop):
# wait until prompt (re-)appears
error = wait_for_prompt(opts, coqtop.stderr, opts.timeout)
num, error = wait_for_prompt(opts, coqtop.stderr, opts.timeout)
# read output produced so far
# FIXME: this only works if all output fits into the OS's pipe buffer
output = read_from_pipe(coqtop.stdout, 0)
@@ -246,6 +265,8 @@ def wait_for_coqtop(opts, coqtop):
error = error.rstrip()
if error:
interaction['error'] = error
if num >= 0:
interaction['prompt_number'] = num
return interaction
def interact(opts, coqtop, src, start, end):
@@ -255,7 +276,7 @@ def interact(opts, coqtop, src, start, end):
coqtop.stdin.write(bytes(input.replace('\n', ' ') + '\n', "utf-8"))
coqtop.stdin.flush()
# wait until prompt (re-)appears
error = wait_for_prompt(opts, coqtop.stderr, opts.timeout)
num, error = wait_for_prompt(opts, coqtop.stderr, opts.timeout)
# read output produced so far
# FIXME: this only works if all output fits into the OS's pipe buffer
output = read_from_pipe(coqtop.stdout, 0)
@@ -280,6 +301,9 @@ def interact(opts, coqtop, src, start, end):
if error:
interaction['error'] = error
if num >= 0:
interaction['prompt_number'] = num
return interaction
def report(n, interaction):
@@ -294,6 +318,38 @@ def report(n, interaction):
print("PROMPT: [%s]" % interaction['error'].strip())
print("-" * 80)
def prompt_out_of_sync(opts, input_number, interaction):
if 'prompt_number' in interaction:
if interaction['prompt_number'] != input_number:
print("[%s] Out of sync: prompt %d in response to input %d" %
(opts.fname, interaction['prompt_number'], input_number))
return True
return False
def output_out_of_sync(opts, cmd, interactions):
if cmd == 'Qed.':
# QED: there should be no more proof obligations.
if 'output' in interactions[-2] and \
interactions[-2]['output'] != ['No more subgoals.']:
print("[%s] Out of sync: 'Qed.' before 'No more subgoals.'" % opts.fname)
return True
# QED: there should be no output.
if 'output' in interactions[-1]:
print("[%s] Out of sync: 'Qed.' produced output.'" % opts.fname)
return True
elif cmd.startswith('End '):
# End of section.
if 'output' in interactions[-1]:
print("[%s] Out of sync: 'End' produced output.'" % opts.fname)
return True
return False
# Some vernacular commands that we need to check for because they change
# coqtop's prompt counter. This list is likely incomplete.
VERNACULAR_COMMANDS_RE = re.compile(r"(Opaque|Transparent) \S+\.")
def feed_to_coqtop(opts, src):
coqtop = launch_coqtop(opts)
interactions = []
@@ -304,14 +360,29 @@ def feed_to_coqtop(opts, src):
report(0, interaction)
interactions.append(interaction)
coq_prompt_offset = 0
# feed statements
last = 0
for end in statement_end_offsets(opts, src):
if opts.pause:
time.sleep(0.1)
cmd = src[last:end+1].strip()
if VERNACULAR_COMMANDS_RE.match(cmd):
coq_prompt_offset += 1
interaction = interact(opts, coqtop, src, last, end)
interactions.append(interaction)
last = end + 1
if opts.verbose:
report(len(interactions), interaction)
sync_failure = False
sync_failure |= prompt_out_of_sync(opts, len(interactions) + coq_prompt_offset, interaction)
sync_failure |= output_out_of_sync(opts, cmd, interactions)
if opts.verbose or sync_failure:
report(len(interactions) + coq_prompt_offset, interaction)
assert not sync_failure
# signal end of input
coqtop.stdin.close()
@@ -332,6 +403,7 @@ def feed_to_coqtop(opts, src):
def process(opts, fname):
print("Collecting proof state for %s..." % fname)
opts.fname = fname
src = open(fname, 'r').read()
interactions = feed_to_coqtop(opts, src)
ext = '.proofstate.yaml' if opts.yaml else '.proofstate.json'
@@ -383,6 +455,9 @@ def parse_args():
parser.add_argument('--verbose', default=False, action='store_true',
help='report on interaction with coqtop')
parser.add_argument('--pause', default=False, action='store_true',
help='pause briefly before feeding data to coqtop')
parser.add_argument('--parse-only', default=False, action='store_true',
help='report how the script splits the input '
'(--verbose for per-character info)')
Loading