File size: 14,926 Bytes
f082004
 
 
 
 
 
 
 
 
 
 
 
 
47d9ef1
f082004
47d9ef1
 
 
f082004
47d9ef1
f082004
47d9ef1
 
 
 
f082004
47d9ef1
f082004
 
47d9ef1
 
 
 
 
 
 
 
f082004
 
 
 
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f082004
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f082004
 
 
47d9ef1
 
 
 
 
f082004
47d9ef1
 
 
 
f082004
 
47d9ef1
58596cd
47d9ef1
 
 
 
 
 
 
58596cd
47d9ef1
 
 
 
 
 
ac97ee4
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58596cd
 
 
 
 
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58596cd
47d9ef1
58596cd
47d9ef1
 
 
ac97ee4
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58596cd
 
 
47d9ef1
 
 
 
58596cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f082004
 
 
 
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
f082004
 
 
 
 
 
 
47d9ef1
 
 
 
 
 
 
 
f082004
47d9ef1
 
f082004
 
 
47d9ef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f082004
 
47d9ef1
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Symbolic Judge: Verifiable Rewards for Logical Reasoning at Scale """

import os
import subprocess
import tempfile
import evaluate
import logging
import datasets
from tqdm import tqdm
import time
import multiprocessing as mp
import re

logger = logging.getLogger(__name__)

_CITATION = """\
@misc{helff2025slrautomatedsynthesisframework,
      title={SLR: An Automated Synthesis Framework for Scalable Logical Reasoning}, 
      author={Lukas Helff and Ahmad Omar and Felix Friedrich and Wolfgang Stammer and Antonia Wüst and Tim Woydt and Rupert Mitchell and Patrick Schramowski and Kristian Kersting},
      year={2025},
      eprint={2506.15787},
      archivePrefix={arXiv},
      primaryClass={cs.AI},
      url={https://arxiv.org/abs/2506.15787}, 
}
"""

_DESCRIPTION = """\
Verifiable Rewards for Scalable Logical Reasoning (SLR) introduces a symbolic judge that provides verifiable rewards for logical reasoning tasks.
To check whether a given task is solved, the symbolic judge evaluates a candidate solution (i.e., a logic rule, typically generated by a language model) using an executable validation program that encodes the task's background knowledge and labeled examples.
Evaluations performed by the symbolic judge are fully verifiable and grounded in formal logic, ensuring an automatic, transparent, and reproducible standard for evaluation and reward in both supervised and reinforcement learning settings.

How it Works:
  - Input: The symbolic judge takes as input a candidate hypothesis (logic rule) and an executable validation program containing background knowledge and examples.
  - Execution: The candidate rule is executed against the validation program using a Prolog interpreter.
  - Correctness Criteria: The rule is considered correct if it entails all positive examples and rejects all negative examples.
  - Metrics: The symbolic judge computes a range of evaluation metrics (detailed below).
  - Usage: see **Documentation tab** for details on how to use the symbolic judge in your own projects.

Example usage:
  from evaluate import load

  symbolic_judge = load("AIML-TUDA/VerifiableRewardsForScalableLogicalReasoning")

  validation_program = \"\"\"
  eastbound(train0).
  has_car(train0, car0_1).
  car_num(car0_1, 1).
  car_color(car0_1, white).
  car_len(car0_1, short).
  has_wall(car0_1, full).

  westbound(train1).
  has_car(train1, car1_1).
  car_num(car1_1, 1).
  car_color(car1_1, yellow).
  car_len(car1_1, short).
  has_wall(car1_1, full).
  \"\"\"

  predicted_rule = "eastbound(Train):- has_car(Train, Car1), car_color(Car1, white)."

  results = symbolic_judge.compute(
      predictions=[predicted_rule],
      references=[{"validation_program": validation_program,
                   "evaluation_config": {
                       "positive_predicate": "eastbound",
                       "negative_predicate": "westbound"
                   }}]
  )

Note: A local Prolog interpreter is required to execute validation programs.
"""

_KWARGS_DESCRIPTION = """
Args:
    predictions (`list` of `str`): Each prediction should be a Prolog rule like "pred(T) :- Body."
    references (`list` of `dict`): Each reference should contain:
        - 'validation_program' (`str`): Background knowledge in Prolog syntax
        - 'evaluation_config' (`dict`, optional): Configuration of predicates to use for evaluation.
        Define: positive_predicate, and negative_predicate
Returns:
    accuracy (`float`): The proportion of predictions that correctly solve all examples. Value is between 0 and 1.
    partial_score (`float`): Average proportion of correctly classified examples across all predictions. Value is between 0 and 1.
    syntax_score (`float`): Proportion of rules with valid syntax. Value is between 0 and 1.
    detailed_results (`list` of `dict`): Per-example results including correctness, partial score, execution time, and any errors encountered.
"""


def _evaluate_with_prolog(prediction, validation_program, eval_config, timeout=5):
    """
    Evaluates a predicted rule against the validation program using Prolog.
    """
    # Extract configuration
    positive_pred = eval_config.get("positive_predicate", "eastbound")
    negative_pred = eval_config.get("negative_predicate", "westbound")
    # extract predicate from rule_to_evaluate
    rule_to_evaluate = extract_ilp_from_text_v2(prediction)
    if positive_pred not in rule_to_evaluate:
        logger.warning(f"Rule '{rule_to_evaluate}' does not contain positive predicate '{positive_pred}'")
        return {
            "is_correct": False,
            "partial_score": 0.0,
            "syntax_valid": False,
            "error": f"Invalid Syntax: Logic Rule not found for symbol '{positive_pred}'"
        }

    pos_examples = re.findall(rf'{positive_pred}\(([^)]+)\)', validation_program)
    neg_examples = re.findall(rf'{negative_pred}\(([^)]+)\)', validation_program)

    # Determine arity by counting commas in first example plus 1
    arity = 1  # default to unary
    if pos_examples:
        arity = pos_examples[0].count(',') + 1
    elif neg_examples:
        arity = neg_examples[0].count(',') + 1

    # Create variables based on arity
    vars = ", ".join([f"X{i}" for i in range(1, arity + 1)])

    symbolic_judge = f"""
% Dynamic evaluation predicates
check({vars}) :- pos({vars}), {positive_pred}({vars}).      % positive covered
check({vars}) :- neg({vars}), \\+ {positive_pred}({vars}).  % negative rejected

% Count successful checks
check_count(Count) :- 
    (setof(({vars}), ((pos({vars}); neg({vars})), check({vars})), CorrectExamples) ->
        length(CorrectExamples, Count)
    ;
        Count = 0
    ).

check_all :- forall((pos({vars});neg({vars})), check({vars})).
    """
    # Add the rule to evaluate
    validation_program = re.sub(rf'\b{positive_pred}\b', 'pos', validation_program)
    validation_program = re.sub(rf'\b{negative_pred}\b', 'neg', validation_program)

    pos_negs = validation_program.count("pos(") + validation_program.count("neg(")
    validation_program = '\n'.join(sorted(validation_program.splitlines()))
    full_program = validation_program + "\n\n" + symbolic_judge + "\n\n" + rule_to_evaluate + "\n\n"

    with tempfile.NamedTemporaryFile(suffix='.pl', mode='w', delete=False) as f:
        f.write(full_program)
        temp_file = f.name

    try:
        eval_start_time = time.time()
        # Execute the Prolog program
        cmd = ['swipl', '-s', temp_file, '-g', 'check_count(Count), writeln(Count)', '-t', 'halt']
        result = subprocess.run(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=timeout,
            text=True
        )
        partial_score = 0.0 if result.stdout.strip() == '' else int(result.stdout.strip())
        # Extract partial score from output
        partial_score = partial_score / pos_negs if pos_negs > 0 else 0.0

        is_correct = True if partial_score == 1.0 else False

        error = f'Rule invalid: "{rule_to_evaluate}" exit with ' + result.stderr if result.stderr else None
        t1 = time.time()

        return {
            "is_correct": is_correct,
            "partial_score": partial_score,
            "syntax_valid": True,
            "error": error,
            "exec_time1": t1 - eval_start_time,
        }

    except subprocess.TimeoutExpired:
        logger.warning(f"Evaluation timed out after {timeout} seconds for rule: {rule_to_evaluate}...")
        return {"is_correct": False, "partial_score": 0.0, "syntax_valid": False,
                "error": f"Evaluation timed out after {timeout} seconds"}
    except Exception as e:
        logger.warning(f"Error evaluating rule '{rule_to_evaluate}' returns: '{result.stdout.strip() if result else 'No error message'}' with error: {e}")
        return {"is_correct": False, "partial_score": 0.0, "syntax_valid": False,
                "error": f"Syntactically invalid rule '{rule_to_evaluate}'"}
    finally:
        if os.path.exists(temp_file):
            os.remove(temp_file)

def extract_ilp_from_text(text):
    rule_patterns = [
        # Pattern with body (full rule with implication)
        r'([a-zA-Z_][a-zA-Z0-9_]*\([^)]*\)\s*:-[^.]*\.)',
        # Pattern for facts (no body)
        # r'([a-zA-Z_][a-zA-Z0-9_]*\([^)]*\)\s*\.)'
    ]
    p_code = ''
    for pattern in rule_patterns:
        matches = re.findall(pattern, text)
        for match in matches:
            # Ensure the rule ends with a period
            statement = match.strip()
            if not statement.endswith('.'):
                statement += '.'
            p_code += statement + '\n'
    return p_code


def extract_ilp_from_text_v2(text, target_predicates=None):
    # Pre-process: collapse code blocks to single lines
    text = re.sub(r'\n\s*', ' ', text)  # crude: flatten all to one line
    # Optionally restrict to only some predicates
    preds = '|'.join([re.escape(p) for p in (target_predicates or [])])
    head_pat = rf"(?:{preds})" if preds else r"[a-zA-Z_][a-zA-Z0-9_]*"
    # Rule pattern, across newlines
    rule_pattern = re.compile(rf'({head_pat}\([^()]*\)\s*:-.*?\.)')
    rules = set(rule_pattern.findall(text))
    # Remove rules that are also captured as facts
    p_code = ''
    for rule in rules:
        # Ensure the rule ends with a period
        statement = rule.strip()
        if not statement.endswith('.'):
            statement += '.'
        p_code += statement + '\n'
    return p_code.strip()  # Ensure no trailing whitespace


@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class VerifiableRewardsForScalableLogicalReasoning(evaluate.Metric):
    def __init__(self, config_name=None, **kwargs):
        """
        Initializes the PrologEval metric.

        Args:
            config_name (str, optional): Name of the configuration to use.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(config_name=config_name, **kwargs)
        self.config_name = config_name or "default"
        self._info = self._info()
        self._download_and_prepare(dl_manager=None)

    def _info(self):
        return evaluate.MetricInfo(
            description=_DESCRIPTION,
            citation=_CITATION,
            inputs_description=_KWARGS_DESCRIPTION,
            features=datasets.Features({
                'predictions': datasets.Value('string'),
                'references': {
                    'validation_program': datasets.Value('string'),
                    'evaluation_config': {
                        'positive_predicate': datasets.Value('string'),
                        'negative_predicate': datasets.Value('string')
                    }
                },
            }),
            codebase_urls=["https://github.com/AIML-TUDA/SLR-Bench"],
            reference_urls=["https://huggingface.co/datasets/AIML-TUDA/SLR-Bench"]
        )

    def _download_and_prepare(self, dl_manager):
        """Checks if SWI-Prolog is installed or warns the user."""
        try:
            subprocess.run(
                ["swipl", "--version"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                check=True
            )
        except (subprocess.CalledProcessError, FileNotFoundError):
            logger.warning(
                "SWI-Prolog not found. Please install it:\n"
                "Ubuntu/Debian: sudo apt-get install swi-prolog\n"
                "macOS: brew install swi-prolog\n"
                "Windows: download from https://www.swi-prolog.org/download/stable"
            )

    def _compute(self, predictions: list, references: list):
        """Calculates the accuracy of predictions using Prolog for evaluation with multiprocessing."""
        if not isinstance(predictions, list):
            predictions = [predictions]

        if len(predictions) != len(references):
            raise ValueError(
                f"Number of predictions ({len(predictions)}) and references {len(references)}) don't match")

        # Prepare evaluation inputs
        eval_inputs = []
        for i, (prediction, reference) in enumerate(zip(predictions, references)):
            validation_program = reference.get("validation_program", reference.get("validation program"))

            # Extract configuration parameters directly from reference
            # This is the key fix: look for config values at the top level if evaluation_config doesn't exist
            eval_config = reference.get("evaluation_config", {
                    "positive_predicate": "eastbound",
                    "negative_predicate": "westbound"
                })

            if not validation_program:
                raise ValueError(f"Example {i} does not contain validation program field")

            eval_inputs.append((prediction, validation_program, eval_config))

        # Process evaluations in parallel
        num_cpus = max(1, mp.cpu_count() - 1)  # Leave one CPU free
        with mp.Pool(processes=num_cpus) as pool:
            results = list(tqdm(
                pool.starmap(_evaluate_with_prolog, eval_inputs),
                total=len(eval_inputs),
                desc="Evaluating rules (parallel)"
            ))
        # no multiprocessing in the main thread, so we can use tqdm directly
        # results = []
        # for prediction, validation_program, eval_config in tqdm(eval_inputs, total=len(predictions), desc="Evaluating rules"):
        #     results.append(_evaluate_with_prolog(prediction, validation_program, eval_config))

        # Calculate metrics
        partial_scores = [result["partial_score"] for result in results]
        correct_count = sum(1 for result in results if result["is_correct"])
        syntax_valid_count = sum(1 for result in results if result["syntax_valid"])

        accuracy = correct_count / len(predictions) if predictions else 0
        partial_score = sum(partial_scores) / len(predictions) if partial_scores else 0
        syntax_score = syntax_valid_count / len(predictions) if predictions else 0

        return {
            "accuracy": accuracy,
            "partial_score": partial_score,
            "syntax_score": syntax_score,
            "detailed_results": results
        }