Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ding/reward_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
from .guided_cost_reward_model import GuidedCostRewardModel
from .ngu_reward_model import RndNGURewardModel, EpisodicNGURewardModel
from .icm_reward_model import ICMRewardModel
# LLM/VLM reward model and verifier
from .math_reward_model import MathRewardModel
from .math_rule_reward_model import MathRuleRewardModel
151 changes: 151 additions & 0 deletions ding/reward_model/math_reward_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from typing import Tuple, Optional, List, Dict
from easydict import EasyDict
from torch.utils.tensorboard import SummaryWriter
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
import re

from ding.utils import REWARD_MODEL_REGISTRY
from .base_reward_model import BaseRewardModel


@REWARD_MODEL_REGISTRY.register('math')
class MathRewardModel(BaseRewardModel):
config = dict(
# (str) The type of the reward model.
type='math',
# (str) The name of the tokenizer and model
model_name='Qwen/Qwen2.5-Math-PRM-7B',
)

def __init__(self, config: EasyDict, device: str, logger, tb_logger: 'SummaryWriter') -> None: # noqa
self.cfg = config
self.device = device
self.logger = logger
self.tb_logger = tb_logger

# 初始化tokenizer和model
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English comments

self.tokenizer = AutoTokenizer.from_pretrained(self.cfg.model_name, trust_remote_code=True)
self.model = AutoModel.from_pretrained(
self.cfg.model_name, device_map=self.device, torch_dtype=torch.bfloat16, trust_remote_code=True
)
self.model.eval()

def make_step_rewards(self, logits: torch.Tensor, token_masks: torch.Tensor) -> List[List[float]]:
"""Calculate step-wise rewards from model outputs"""
probabilities = F.softmax(logits, dim=-1)
probabilities = probabilities * token_masks.unsqueeze(-1) # bs, seq_len, num_labels

all_scores_res = []
for i in range(probabilities.size(0)):
sample = probabilities[i] # seq_len, num_labels
positive_probs = sample[sample != 0].view(-1, 2)[:, 1] # valid_tokens, num_labels
non_zero_elements_list = positive_probs.cpu().tolist()
all_scores_res.append(non_zero_elements_list)
return all_scores_res

def estimate(self, data: List[Dict]) -> List[Dict]:
"""
Overview:
Estimate rewards for mathematical reasoning steps using Qwen2.5-Math-PRM-7B model.
Arguments:
- data (:obj:`List[Dict]`): List of dictionaries containing:
- system (:obj:`str`): System prompt for the model
- query (:obj:`str`): The mathematical query to be evaluated
- response (:obj:`List[str]`): List of reasoning steps
Returns:
- reward (:obj:`List[Dict]`): List of dictionaries containing:
- reward (:obj:`float`): Final reward (last step reward)
- metadata (:obj:`Dict`): Additional information including:
- query (:obj:`str`): Original query
- step_rewards (:obj:`List[float]`): Rewards for each reasoning step
- num_steps (:obj:`int`): Number of reasoning steps
Shapes:
- input_ids (:obj:`torch.LongTensor`): :math:`(B, L)`, where B is batch size and L is sequence length
- outputs (:obj:`torch.FloatTensor`): :math:`(B, L, H)`, where H is hidden size
- token_masks (:obj:`torch.BoolTensor`): :math:`(B, L)`
- step_rewards (:obj:`List[List[float]]`): List of length B, each containing S rewards where S is num steps
Examples:
>>> data = [{
>>> "system": "Please reason step by step...",
>>> "query": "What is 1 + 1?",
>>> "response": ["First, we have 1", "Then add 1", "Therefore, 1 + 1 = 2"]
>>> }]
>>> results = model.estimate(data)
>>> print(results[0]["reward"]) # 1.0
>>> print(results[0]["metadata"]["step_rewards"]) # [0.8, 0.9, 1.0]
"""
# 批量处理所有样本
all_messages = []
for item in data:
messages = [
{
"role": "system",
"content": item['system']
},
{
"role": "user",
"content": item['query']
},
{
"role": "assistant",
"content": "<extra_0>".join(item['response']) + "<extra_0>"
},
]
all_messages.append(messages)

# 批量转换为模型输入格式
conversation_strs = [
self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
for messages in all_messages
]

# 批量编码输入
input_ids = self.tokenizer(
conversation_strs, return_tensors="pt", padding=True, truncation=True
)["input_ids"].to(self.model.device)

# 批量获取模型输出
with torch.no_grad():
outputs = self.model(input_ids=input_ids)

# 计算每个样本的步骤奖励
step_sep_id = self.tokenizer.encode("<extra_0>")[0]
token_masks = (input_ids == step_sep_id)
batch_rewards = self.make_step_rewards(outputs[0], token_masks)

# 构建详细的结果字典
results = []
for item, step_rewards in zip(data, batch_rewards):
results.append(
{
"reward": step_rewards[-1] if step_rewards else 0.0, # 最后一步的奖励作为总体奖励
"metadata": {
"query": item['query'],
"step_rewards": step_rewards, # 每个步骤的奖励
"num_steps": len(item['response']),
}
}
)

return results

def train(self):
"""
Training is not implemented for this reward model as it uses a pre-trained model
"""
self.logger.warning("Training is not implemented for this reward model")
pass

def collect_data(self, data: list) -> None:
"""
Data collection is not needed for this reward model
"""
pass

def clear_data(self) -> None:
"""
Data clearing is not needed for this reward model
"""
pass
123 changes: 123 additions & 0 deletions ding/reward_model/math_rule_reward_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from typing import Tuple, Optional, List, Dict
from easydict import EasyDict
from torch.utils.tensorboard import SummaryWriter
from transformers import AutoTokenizer
import re

from ding.utils import REWARD_MODEL_REGISTRY
from .base_reward_model import BaseRewardModel


@REWARD_MODEL_REGISTRY.register('math_rule')
class MathRuleRewardModel(BaseRewardModel):
config = dict(
# (str) The type of the reward model.
type='math_rule',
# (str) The name of the dataset, usually the huggingface dataset name.
dataset_name='',
# (str) The name of the tokenizer, usually the huggingface tokenizer name.
tokenizer_name='',
# (float) The score of format error.
format_error_reward=-2,
# (float) The score of answer error.
answer_error_reward=-1,
# (float) The score of correct.
correct_reward=1,
)

def __init__(self, config: EasyDict, device: str, logger, tb_logger: 'SummaryWriter') -> None: # noqa
self.cfg = config
self.device = device
self.logger = logger
self.tb_logger = tb_logger

def estimate(self, data: List[str]) -> List[Dict]:
"""
Arguments:
- data (:obj:`List[str]`): The list of data queries used for estimation, each query is a string of the \
form "1 + 1 = ?"
Returns:
- reward (:obj:`List[Dict]`): The estimated reward.
"""
# 1. parse the query to get question and predicted answer
# 2. get the ground truth answer according to the question
# 3. calculate the reward based on the predicted answer and the ground truth answer
# (format error -2, answer error -1, correct 1)
pass

# rule-based reward model does not need training, thus the following methods are empty
def train(self):
pass

def collect_data(self, data: list) -> None:
pass

def clear_data(self) -> None:
pass


def strip_sequence(text: str, pad_token: str, eos_token: str) -> str:
"""
Overview:
Remove leading and trailing sequences of padding/eos tokens from a text.

.. note::
This function uses regular expressions to strip all consecutive occurrences
of the specified padding and end-of-sequence tokens from both the beginning
and end of the input text. Tokens in the middle of the text are preserved.

Arguments:
- text (str): The input text to be processed.
- pad_token (str): The padding token to be stripped (e.g., "<PAD>").
- eos_token (str): The end-of-sequence token to be stripped (e.g., "<EOS>").

Returns:
- cleaned_text (str): The cleaned text with leading/trailing padding/eos tokens removed.

Examples:
>>> strip_sequence("<PAD><EOS>Hello<EOS><PAD>", "<PAD>", "<EOS>")
'Hello'

>>> strip_sequence("Test<EOS>Middle<PAD>Keep", "<PAD>", "<EOS>")
'Test<EOS>Middle<PAD>Keep'

>>> strip_sequence("<EOS><EOS><PAD>Full removal<PAD><EOS>", "<PAD>", "<EOS>")
'Full removal'

>>> strip_sequence("No tokens here", "<PAD>", "<EOS>")
'No tokens here'

>>> strip_sequence("<PAD><PAD>", "<PAD>", "<EOS>")
''
"""
pad_token_escaped = re.escape(pad_token)
eos_token_escaped = re.escape(eos_token)

# Remove leading tokens
pattern = f"^({eos_token_escaped}|{pad_token_escaped})+"
text = re.sub(pattern, "", text)

# Remove trailing tokens
pattern = f"({eos_token_escaped}|{pad_token_escaped})+$"
text = re.sub(pattern, "", text)
return text


def normalize_text(text: str) -> str:
"""
Overview:
This function is designed to standardize text by:
- Converting all text to lowercase
- Replacing various punctuation marks and special characters with spaces
- Removing import statements
- Normalizing whitespace by replacing multiple spaces with a single space
- Stripping leading and trailing whitespace
Arguments:
- text (str): The input text to be processed.
Returns:
- normalized_text (str): The normalized text.
"""
text = re.sub("[,.:\"'\[\]\-=\+\\|!@#$%^&*();<>?/!¥…()—\{\}:”“《》?]", " ", text.lower())
text = re.sub("import\s[a-zA-Z\.]+(\sas\s[a-zA-Z\.]+)\n", " ", text)
text = re.sub("\s+", " ", text)
return text.strip()
87 changes: 87 additions & 0 deletions ding/reward_model/tests/test_math_reward_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pytest
from easydict import EasyDict
import torch
from unittest.mock import MagicMock

from ding.reward_model import MathRewardModel


@pytest.mark.envtest
def test_math_reward_model():
# Create configuration
cfg = EasyDict(dict(
type='math',
model_name='Qwen/Qwen2.5-Math-PRM-7B',
))

# Create mock logger and tb_logger
logger = MagicMock()
tb_logger = MagicMock()

# Initialize reward model
model = MathRewardModel(cfg, "cuda" if torch.cuda.is_available() else "cpu", logger, tb_logger)

# Test case 1: Simple math problem
data_simple = [
{
"system": "Please reason step by step...",
"query": "What is 1 + 1?",
"response": ["First, we have 1", "Then add 1", "Therefore, 1 + 1 = 2"]
}
]

# Test case 2: Complex word problem
data_complex = [
{
"system": "Please reason step by step, and put your final answer within \\boxed{}.",
"query": "Sue lives in a fun neighborhood...",
"response": [
"To find out how many more pink plastic flamingos...",
"On Saturday, they take back one third of the flamingos...",
"On Sunday, the neighbors add another 18 pink plastic flamingos...",
"To find the difference, subtract the number of white flamingos..."
]
}
]

# Test simple case
results_simple = model.estimate(data_simple)

# Verify simple case results
assert len(results_simple) == 1, "Should return one result"
assert "reward" in results_simple[0], "Result should contain reward"
assert "metadata" in results_simple[0], "Result should contain metadata"
assert "step_rewards" in results_simple[0]["metadata"], "Metadata should contain step_rewards"
assert len(results_simple[0]["metadata"]["step_rewards"]) == 3, "Should have 3 step rewards"
assert results_simple[0]["metadata"]["num_steps"] == 3, "Should have 3 steps"

# Test complex case
results_complex = model.estimate(data_complex)

# Verify complex case results
assert len(results_complex) == 1, "Should return one result"
assert "reward" in results_complex[0], "Result should contain reward"
assert "metadata" in results_complex[0], "Result should contain metadata"
assert "step_rewards" in results_complex[0]["metadata"], "Metadata should contain step_rewards"
assert len(results_complex[0]["metadata"]["step_rewards"]) == 4, "Should have 4 step rewards"
assert results_complex[0]["metadata"]["num_steps"] == 4, "Should have 4 steps"

# Verify reward value ranges
for result in results_simple + results_complex:
assert 0 <= result["reward"] <= 1, "Reward should be between 0 and 1"
for step_reward in result["metadata"]["step_rewards"]:
assert 0 <= step_reward <= 1, "Step rewards should be between 0 and 1"

# Test batch processing functionality
batch_data = data_simple + data_complex
batch_results = model.estimate(batch_data)
assert len(batch_results) == 2, "Should return two results for batch processing"

# Print detailed information for debugging
print("\nSimple problem results:")
print(f"Final reward: {results_simple[0]['reward']}")
print(f"Step rewards: {results_simple[0]['metadata']['step_rewards']}")

print("\nComplex problem results:")
print(f"Final reward: {results_complex[0]['reward']}")
print(f"Step rewards: {results_complex[0]['metadata']['step_rewards']}")
20 changes: 20 additions & 0 deletions ding/reward_model/tests/test_math_rule_reward_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest
from easydict import EasyDict

from ding.reward_model import MathRuleRewardModel


@pytest.mark.envtest
def test_math_rule_reward_model():
reward_model = MathRuleRewardModel(
config=EasyDict(
dataset_name='RUC-AIBOX/STILL-3-Preview-RL-Data',
tokenizer_name='unsloth/Meta-Llama-3.1-8B',
)
)

data = [
"The school now introduces a new color, silver, for the flag design. Crestview's school colors are now purple, gold, and silver. The students are designing a flag using three solid-colored horizontal stripes. Using one, two, or all three of the school colors, how many different flags are possible if adjacent stripes may be the same color?", # noqa
]
rewards = reward_model.estimate(data)
assert len(rewards) == len(data)