diff --git a/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_mistral_22b.parquet b/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_mistral_22b.parquet new file mode 100644 index 00000000..51020595 Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_mistral_22b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen_14b.parquet b/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen_14b.parquet new file mode 100644 index 00000000..295e9ad7 Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen_14b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_mistral_22b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_mistral_22b.parquet new file mode 100644 index 00000000..b92e7a5c Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_mistral_22b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen_14b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen_14b.parquet new file mode 100644 index 00000000..b1bd995f Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen_14b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_mistral_22b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_mistral_22b.parquet new file mode 100644 index 00000000..d7278893 Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_mistral_22b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen_14b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen_14b.parquet new file mode 100644 index 00000000..db8353e0 Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen_14b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_mistral_22b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_mistral_22b.parquet new file mode 100644 index 00000000..a5496293 Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_mistral_22b.parquet differ diff --git a/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_14b.parquet b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_14b.parquet new file mode 100644 index 00000000..4229c35a Binary files /dev/null and b/data/out/chunk_entropy/proxy_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_14b.parquet differ diff --git a/data/out/chunk_entropy/student_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen3b_phi4mini.parquet b/data/out/chunk_entropy/student_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen3b_phi4mini.parquet new file mode 100644 index 00000000..f5f067f6 Binary files /dev/null and b/data/out/chunk_entropy/student_entropies/corrected_answer_deepseek_v4_pro_and_others/corrected_answer_deepseek_v4_pro_and_others_qwen3b_phi4mini.parquet differ diff --git a/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen3b_phi4mini.parquet b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen3b_phi4mini.parquet new file mode 100644 index 00000000..fe115506 Binary files /dev/null and b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled/distilled_deepseek_v4_flash_qwen3b_phi4mini.parquet differ diff --git a/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen3b_phi4mini.parquet b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen3b_phi4mini.parquet new file mode 100644 index 00000000..df690b70 Binary files /dev/null and b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_extend_w_large/distilled_deepseek_v4_flash_extend_w_large_qwen3b_phi4mini.parquet differ diff --git a/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_phi4mini.parquet b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_phi4mini.parquet new file mode 100644 index 00000000..f2b1f049 Binary files /dev/null and b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_phi4mini.parquet differ diff --git a/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_3b.parquet b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_3b.parquet new file mode 100644 index 00000000..a6867071 Binary files /dev/null and b/data/out/chunk_entropy/student_entropies/deepseek_v4_flash_distilled_w_explained/distilled_w_explained_deepseek_v4_flash_qwen_3b.parquet differ diff --git a/src/core/entropy_dynamics/analysis_combined.py b/src/core/entropy_dynamics/analysis_combined.py index 2023e092..b54a6646 100644 --- a/src/core/entropy_dynamics/analysis_combined.py +++ b/src/core/entropy_dynamics/analysis_combined.py @@ -256,4 +256,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/core/entropy_dynamics/analyzer.py b/src/core/entropy_dynamics/analyzer.py index a0374fdc..c78c8ea9 100644 --- a/src/core/entropy_dynamics/analyzer.py +++ b/src/core/entropy_dynamics/analyzer.py @@ -2,10 +2,10 @@ from pathlib import Path -import matplotlib.pyplot as plt +#import matplotlib.pyplot as plt import numpy as np import pandas as pd -import seaborn as sns +#import seaborn as sns def load_results(path: str | Path) -> pd.DataFrame: diff --git a/src/core/entropy_dynamics/config.py b/src/core/entropy_dynamics/config.py index 135ce90b..5a054376 100644 --- a/src/core/entropy_dynamics/config.py +++ b/src/core/entropy_dynamics/config.py @@ -52,4 +52,4 @@ def out_path(self) -> Path: def results_filename(self) -> str: """Unique filename based on teacher source and role to prevent overwrites.""" stem = Path(self.teacher_reasoning_path).stem - return f"entropy_dynamics_{self.role.value}_{stem}.parquet" \ No newline at end of file + return f"entropy_dynamics_{self.role.value}_{stem}.parquet" diff --git a/src/core/entropy_dynamics/prompt_builder.py b/src/core/entropy_dynamics/prompt_builder.py index 02447688..4b761234 100644 --- a/src/core/entropy_dynamics/prompt_builder.py +++ b/src/core/entropy_dynamics/prompt_builder.py @@ -115,6 +115,11 @@ def _tokenize_with_assistant_prefix( base_ids = tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=True ) + + # FIX: Flatten BatchEncoding if necessary + if not isinstance(base_ids, list): + base_ids = base_ids.input_ids[0] if isinstance(base_ids.input_ids[0], list) else list(base_ids.input_ids) + prefix_ids = tokenizer.encode(assistant_prefix, add_special_tokens=False) return base_ids + prefix_ids diff --git a/src/core/entropy_dynamics/reasoning_loader.py b/src/core/entropy_dynamics/reasoning_loader.py index f27865c7..05b7ed91 100644 --- a/src/core/entropy_dynamics/reasoning_loader.py +++ b/src/core/entropy_dynamics/reasoning_loader.py @@ -27,13 +27,16 @@ def load_teacher_reasoning( tokenizer: PreTrainedTokenizer, min_thinking_tokens: int = 16, ) -> list[TeacherReasoning]: - """Load and tokenize teacher reasoning chains. - """ + """Load and tokenize teacher reasoning chains.""" df = pd.read_parquet(path) + # Маппим distill_reasoning в ожидаемый колонку thinking, если пришел новый датасет + if "distill_reasoning" in df.columns and "thinking" not in df.columns: + df = df.rename(columns={"distill_reasoning": "thinking"}) + if "input" in df.columns and "output" in df.columns: records = _parse_synth_aug(df) - elif "thinking" in df.columns: + elif "thinking" in df.columns: # Сюда теперь зайдет и ваш датасет records = _parse_flat(df) else: raise ValueError( @@ -57,7 +60,6 @@ def load_teacher_reasoning( return results - def _parse_synth_aug(df: pd.DataFrame) -> list[TeacherReasoning]: records: list[TeacherReasoning] = [] @@ -125,4 +127,4 @@ def _safe_literal_eval(s) -> list: result = ast.literal_eval(str(s)) return list(result) if isinstance(result, (list, tuple)) else [] except Exception: - return [] \ No newline at end of file + return [] diff --git a/src/core/entropy_dynamics/run_experiment.py b/src/core/entropy_dynamics/run_experiment.py index 4c7a69e3..d81ea4d6 100644 --- a/src/core/entropy_dynamics/run_experiment.py +++ b/src/core/entropy_dynamics/run_experiment.py @@ -32,7 +32,7 @@ import argparse import sys -from core.entropy_dynamics.analyzer import run_full_analysis +#from core.entropy_dynamics.analyzer import run_full_analysis from core.entropy_dynamics.config import ( EntropyDynamicsConfig, ExperimentRole, InferenceMode, StudentModelConfig, ) @@ -40,14 +40,15 @@ # ── Model presets by role ── STUDENT_MODELS = [ - StudentModelConfig(model_id="/home/dviazhev/complexity-aware-fine-tuning-old/src/models/Qwen2.5-3B-Instruct", label="qwen_3b"), - StudentModelConfig(model_id="/home/dviazhev/qa_finetune/Phi-4-mini-instruct", label="phi4_mini"), -# StudentModelConfig(model_id="meta-llama/Llama-3.2-3B-Instruct", label="llama_3b"), + StudentModelConfig(model_id="Qwen/Qwen2.5-3B", label="qwen_3b"), + StudentModelConfig(model_id="microsoft/Phi-4-mini-instruct", label="phi4_mini"), + StudentModelConfig(model_id="meta-llama/Llama-3.2-3B-Instruct", label="llama_3b"), ] PROXY_MODELS = [ - StudentModelConfig(model_id="/home/dviazhev/recursive_caft/models/Qwen2.5-32B-Instruct", label="qwen_32b"), - StudentModelConfig(model_id="/home/dviazhev/recursive_caft/models/Mistral-Small-24B-Instruct-2501", label="mistral_24b"), + StudentModelConfig(model_id="/mnt/data198/LLM/models/Qwen2.5-32B-Instruct", label="qwen_32b"), + StudentModelConfig(model_id="/mnt/data198/LLM/models/Qwen2.5-14B-Instruct", label="qwen_14b"), + StudentModelConfig(model_id="/mnt/data198/LLM/models/Mistral-Small-3.2-24B-Instruct-2506", label="mistral_24b"), ] @@ -138,10 +139,10 @@ def main(): results_path = config.out_path / config.results_filename if results_path.exists(): print(f"\nRunning analysis on {results_path}...") - run_full_analysis(results_path, config.out_path / "analysis") + # run_full_analysis(results_path, config.out_path / "analysis") else: print(f"Results file not found at {results_path}. Run inference first.") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/core/entropy_dynamics/runner.py b/src/core/entropy_dynamics/runner.py index 3e442b60..693939d5 100644 --- a/src/core/entropy_dynamics/runner.py +++ b/src/core/entropy_dynamics/runner.py @@ -21,16 +21,17 @@ @dataclass class StepResult: - """Single measurement: one student × one question × one k.""" + """Single measurement: one model × one question × one k.""" question_id: str - student_label: str + model_label: str # was "student_label" — now generic + role: str # "student" or "proxy" k: int num_reasoning_tokens: int total_reasoning_tokens: int mode: str answer_entropy: float - student_answer: str - student_correct: bool + model_answer: str # was "student_answer" + model_correct: bool # was "student_correct" gold_answer: str @@ -50,7 +51,7 @@ def save(self, path: Path): class EntropyDynamicsRunner: - """Runs the full entropy dynamics experiment.""" + """Runs the full entropy dynamics experiment for any role (student or proxy).""" def __init__(self, config: EntropyDynamicsConfig): self.config = config @@ -58,10 +59,8 @@ def __init__(self, config: EntropyDynamicsConfig): def run(self) -> pd.DataFrame: set_seed() - # Load teacher reasoning with a lightweight tokenizer - # (any tokenizer sharing the vocab works for slicing) - first_student_id = self.config.students[0].model_id - loader_tokenizer = AutoTokenizer.from_pretrained(first_student_id) + first_model_id = self.config.students[0].model_id + loader_tokenizer = AutoTokenizer.from_pretrained(first_model_id) print(f"Loading teacher reasoning from {self.config.teacher_reasoning_path}...") samples = load_teacher_reasoning( @@ -73,48 +72,53 @@ def run(self) -> pd.DataFrame: all_results = ExperimentResults() - for student_cfg in self.config.students: - self._run_single_student(student_cfg, samples, all_results) + for model_cfg in self.config.students: + self._run_single_model(model_cfg, samples, all_results) - out_path = self.config.out_path / "entropy_dynamics_results.parquet" + # ── Use unique filename to prevent overwrites ── + out_path = self.config.out_path / self.config.results_filename all_results.save(out_path) print(f"All results saved to {out_path}") return all_results.to_dataframe() - def _run_single_student( + def _run_single_model( self, - student_cfg: StudentModelConfig, + model_cfg: StudentModelConfig, samples: list[TeacherReasoning], results: ExperimentResults, ): + role = self.config.role.value print(f"\n{'='*60}") - print(f"Student: {student_cfg.label} ({student_cfg.model_id})") + print(f"[{role}] {model_cfg.label} ({model_cfg.model_id})") print(f"{'='*60}") - tokenizer = AutoTokenizer.from_pretrained(student_cfg.model_id) + tokenizer = AutoTokenizer.from_pretrained(model_cfg.model_id) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( - student_cfg.model_id, + model_cfg.model_id, device_map=DEVICE_MAP, torch_dtype=torch.bfloat16, ) model.eval() + # ── Checkpoint includes role to avoid collision ── checkpoint_path = ( - self.config.out_path / f"checkpoint_{student_cfg.label}_{self.config.mode.value}.parquet" + self.config.out_path + / f"checkpoint_{role}_{model_cfg.label}_{self.config.mode.value}.parquet" ) processed_ids = _load_processed_ids(checkpoint_path) print(f"Resuming: {len(processed_ids)} questions already processed.") t_start = time.perf_counter() - for i, sample in enumerate(tqdm(samples, desc=f"[{student_cfg.label}]")): + for i, sample in enumerate(tqdm(samples, desc=f"[{role}/{model_cfg.label}]")): if sample.question_id in processed_ids: continue + # Re-tokenize with this model's tokenizer sample.thinking_token_ids = tokenizer.encode( sample.thinking_text, add_special_tokens=False ) @@ -128,19 +132,19 @@ def _run_single_student( ) for prompt in prefixed_prompts: - step_result = self._run_single_step(model, tokenizer, prompt, student_cfg.label) + step_result = self._run_single_step( + model, tokenizer, prompt, model_cfg.label, role + ) results.append(step_result) - # Checkpoint if (i + 1) % self.config.batch_save_every == 0: results.save(checkpoint_path) elapsed = time.perf_counter() - t_start - print(f" [{student_cfg.label}] {i+1}/{len(samples)} " + print(f" [{role}/{model_cfg.label}] {i+1}/{len(samples)} " f"({elapsed:.0f}s elapsed)") results.save(checkpoint_path) - # Free del model gc.collect() if torch.cuda.is_available(): @@ -152,25 +156,25 @@ def _run_single_step( model, tokenizer: PreTrainedTokenizer, prompt: PrefixedPrompt, - student_label: str, + model_label: str, + role: str, ) -> StepResult: """Run one forward pass and extract entropy.""" - input_ids = torch.tensor([prompt.input_ids], device=DEVICE) attention_mask = torch.ones_like(input_ids) if prompt.mode == InferenceMode.FORCED: return self._step_forced( - model, tokenizer, input_ids, attention_mask, prompt, student_label + model, tokenizer, input_ids, attention_mask, prompt, model_label, role ) else: return self._step_continuation( - model, tokenizer, input_ids, attention_mask, prompt, student_label + model, tokenizer, input_ids, attention_mask, prompt, model_label, role ) def _step_forced( self, model, tokenizer, input_ids, attention_mask, - prompt: PrefixedPrompt, student_label: str, + prompt: PrefixedPrompt, model_label: str, role: str, ) -> StepResult: """Mode A: generate 1 token, measure its entropy.""" outputs = model.generate( @@ -188,26 +192,27 @@ def _step_forced( entropy = compute_entropy_from_logits(first_token_logits).item() generated_id = outputs.sequences[0, input_ids.shape[1]].item() - student_answer = tokenizer.decode([generated_id]).strip().lower() + answer = tokenizer.decode([generated_id]).strip().lower() return StepResult( question_id=prompt.question_id, - student_label=student_label, + model_label=model_label, + role=role, k=prompt.k, num_reasoning_tokens=prompt.num_reasoning_tokens, total_reasoning_tokens=prompt.total_reasoning_tokens, mode=prompt.mode.value, answer_entropy=entropy, - student_answer=student_answer, - student_correct=(student_answer == prompt.gold_answer), + model_answer=answer, + model_correct=(answer == prompt.gold_answer), gold_answer=prompt.gold_answer, ) def _step_continuation( self, model, tokenizer, input_ids, attention_mask, - prompt: PrefixedPrompt, student_label: str, + prompt: PrefixedPrompt, model_label: str, role: str, ) -> StepResult: - """Mode B: student continues generating, measure avg entropy of tail.""" + """Mode B: model continues generating, measure avg entropy of tail.""" outputs = model.generate( input_ids=input_ids, attention_mask=attention_mask, @@ -222,11 +227,11 @@ def _step_continuation( gen_ids = outputs.sequences[0, input_ids.shape[1]:] gen_text = tokenizer.decode(gen_ids, skip_special_tokens=True) - student_answer = "" - marker_pos = gen_text.find(answer_marker[1]) # find "]]" + answer = "" + marker_pos = gen_text.find(answer_marker[1]) marker_start = gen_text.rfind(answer_marker[0], 0, marker_pos if marker_pos != -1 else None) if marker_start != -1 and marker_pos != -1: - student_answer = gen_text[marker_start + len(answer_marker[0]):marker_pos].strip().lower() + answer = gen_text[marker_start + len(answer_marker[0]):marker_pos].strip().lower() scores = outputs.scores if not scores: @@ -243,20 +248,20 @@ def _step_continuation( return StepResult( question_id=prompt.question_id, - student_label=student_label, + model_label=model_label, + role=role, k=prompt.k, num_reasoning_tokens=prompt.num_reasoning_tokens, total_reasoning_tokens=prompt.total_reasoning_tokens, mode=prompt.mode.value, answer_entropy=entropy, - student_answer=student_answer, - student_correct=(student_answer == prompt.gold_answer), + model_answer=answer, + model_correct=(answer == prompt.gold_answer), gold_answer=prompt.gold_answer, ) def _load_processed_ids(checkpoint_path: Path) -> set[str]: - """Load question_ids already processed from a checkpoint parquet.""" if not checkpoint_path.exists(): return set() try: diff --git a/src/core/entropy_dynamics/runner_vllm.py b/src/core/entropy_dynamics/runner_vllm.py index 56bf9a7f..7141e9f1 100644 --- a/src/core/entropy_dynamics/runner_vllm.py +++ b/src/core/entropy_dynamics/runner_vllm.py @@ -21,25 +21,6 @@ --out_dir artifacts/entropy_dynamics/mmlu_forced \\ --role proxy --mode forced --students qwen_32b \\ --use_vllm --gpu_memory_utilization 0.85 - -My script for starting: -env["PYTHONPATH"] = "/home/dviazhev/recursive_caft/src" -env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" -env["TOKENIZERS_PARALLELISM"] = "false" -env["VLLM_USE_V1"] = "0" - -cmd = [ - "python", "-m", "core.entropy_dynamics.run_experiment", - "--teacher_reasoning_path", "/home/dviazhev/recursive_caft/data/out/distillation/mmlu_synth_gptoss_b_t0_8.parquet", - "--out_dir", "artifacts/entropy_dynamics/gpt_b_proxy", - "--role", "proxy", - "--mode", "forced", - "--dataset_type", "mmlu", - "--students", "qwen_32b", "mistral_24b", - "--use_vllm", - "--gpu_memory_utilization", "0.90", - "--max_model_len", "8192", - "--tensor_parallel_size", "0", # 0 = auto-detect """ from __future__ import annotations @@ -55,6 +36,8 @@ import torch from tqdm import tqdm from transformers import AutoTokenizer +from vllm import TokensPrompt + from core.entropy_dynamics.config import EntropyDynamicsConfig, StudentModelConfig from core.entropy_dynamics.prompt_builder import PrefixedPrompt, build_prefixed_prompts @@ -130,6 +113,91 @@ def _save_prompt_cache(path: Path, prompts: list[PrefixedPrompt]) -> None: print(f" Prompt cache saved: {path.name} ({len(prompts)} prompts)") + +# --------------------------------------------------------------------------- +# Parallel prompt building +# --------------------------------------------------------------------------- + +def _build_one_sample(args: tuple) -> list: + """Worker function: tokenize one sample and build all its prefixed prompts. + + Receives a plain tuple (not a TeacherReasoning) to be picklable across + processes. Returns a list of dicts that can be reconstructed into + PrefixedPrompt objects in the main process. + """ + from transformers import AutoTokenizer + from core.entropy_dynamics.prompt_builder import build_prefixed_prompts + from core.entropy_dynamics.reasoning_loader import TeacherReasoning + + (question_id, question, options, gold_answer, thinking_text, + model_id, window_size, mode, dataset_type) = args + + tokenizer = AutoTokenizer.from_pretrained(model_id) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + sample = TeacherReasoning( + question_id=question_id, + question=question, + options=options, + gold_answer=gold_answer, + thinking_text=thinking_text, + thinking_token_ids=tokenizer.encode(thinking_text, add_special_tokens=False), + ) + + prompts = build_prefixed_prompts( + sample=sample, + tokenizer=tokenizer, + window_size=window_size, + mode=mode, + dataset_type=dataset_type, + ) + return prompts + + +def _build_prompts_parallel( + samples: list, + model_id: str, + window_size: int, + mode, + dataset_type: str, + num_workers: int = 16, # 0 = auto (cpu_count / 2) +) -> list: + """Build all prefixed prompts in parallel using ProcessPoolExecutor. + + Each worker loads its own tokenizer instance (necessary because + HuggingFace tokenizers are not fork-safe). Workers are spawned fresh, + so GPU memory from the main process is not duplicated. + """ + import multiprocessing + from concurrent.futures import ProcessPoolExecutor, as_completed + + if num_workers <= 0: + num_workers = max(1, multiprocessing.cpu_count() // 2) + + print(f"Building prompts for {len(samples)} questions " + f"(parallel, {num_workers} workers)...") + + args_list = [ + ( + s.question_id, s.question, s.options, s.gold_answer, + s.thinking_text, model_id, window_size, mode, dataset_type, + ) + for s in samples + ] + + all_prompts = [] + with ProcessPoolExecutor(max_workers=num_workers, + mp_context=__import__("multiprocessing").get_context("spawn")) as pool: + futures = {pool.submit(_build_one_sample, a): i for i, a in enumerate(args_list)} + done = 0 + for fut in tqdm(as_completed(futures), total=len(futures), desc="Building prompts"): + all_prompts.extend(fut.result()) + done += 1 + + print(f" Built {len(all_prompts)} prompts total.") + return all_prompts + # --------------------------------------------------------------------------- # Runner # --------------------------------------------------------------------------- @@ -241,19 +309,13 @@ def _run_single_model( all_prompts = _load_prompt_cache(cache_path) if all_prompts is None: - print(f"Building prompts for {len(remaining)} questions...") - all_prompts = [] - for sample in tqdm(remaining, desc="Building prompts"): - sample.thinking_token_ids = tokenizer.encode( - sample.thinking_text, add_special_tokens=False - ) - all_prompts.extend(build_prefixed_prompts( - sample=sample, - tokenizer=tokenizer, - window_size=self.config.window_size, - mode=self.config.mode, - dataset_type=self.config.dataset_type, - )) + all_prompts = _build_prompts_parallel( + samples=remaining, + model_id=model_cfg.model_id, + window_size=self.config.window_size, + mode=self.config.mode, + dataset_type=self.config.dataset_type, + ) _save_prompt_cache(cache_path, all_prompts) else: before = len(all_prompts) @@ -336,9 +398,10 @@ def _run_single_model( for batch_start in range(0, total, CHECKPOINT_EVERY): batch_prompts = all_prompts[batch_start:batch_start + CHECKPOINT_EVERY] batch_token_ids = [p.input_ids for p in batch_prompts] + vllm_inputs = [TokensPrompt(prompt_token_ids=ids) for ids in batch_token_ids] batch_outputs = llm.generate( - prompt_token_ids=batch_token_ids, + prompts=vllm_inputs, sampling_params=sampling_params, use_tqdm=True, )