Refactor: eliminate duplicated code in PDF processing

- Remove ~200 lines of duplicate code between single-file and batch processing
- Consolidate all PDF processing logic into process_single_pdf() function
- Add batch_mode parameter to control output formatting
- Single-file and batch modes now use the same code path
- Improves maintainability and reduces chance of inconsistencies

Net reduction: 202 lines deleted, 56 lines added (-146 lines total)
This commit is contained in:
Mac DeCourcy 2025-10-06 17:47:46 -07:00
parent 37267fbf34
commit 2c17d86fe7

View file

@ -320,22 +320,39 @@ def append_markdown(path, md_text):
with open(path, mode) as f:
f.write(md_text.strip() + "\n\n")
def process_single_pdf(pdf_path, height_in, weight_lb, outdir):
"""Process a single PDF file and return success status"""
def process_single_pdf(pdf_path, height_in, weight_lb, outdir, batch_mode=False):
"""Process a single PDF file and return success status
Args:
pdf_path: Path to PDF file
height_in: Height in inches
weight_lb: Weight in pounds (optional)
outdir: Output directory
batch_mode: If True, use batch-style output messages
Returns:
bool: True if successful, False otherwise
"""
try:
# Validate PDF file
pdf_file = Path(pdf_path)
if not pdf_file.exists():
print(f" ❌ Skipping {pdf_path}: File not found", file=sys.stderr)
msg = f" ❌ Skipping {pdf_path}: File not found" if batch_mode else f"❌ Error: PDF file not found: {pdf_path}"
print(msg, file=sys.stderr)
return False
if not pdf_file.is_file():
print(f" ❌ Skipping {pdf_path}: Not a file", file=sys.stderr)
msg = f" ❌ Skipping {pdf_path}: Not a file" if batch_mode else f"❌ Error: Path is not a file: {pdf_path}"
print(msg, file=sys.stderr)
return False
if pdf_file.suffix.lower() != '.pdf':
print(f" ❌ Skipping {pdf_path}: Not a PDF", file=sys.stderr)
msg = f" ❌ Skipping {pdf_path}: Not a PDF" if batch_mode else f"❌ Error: File is not a PDF: {pdf_path}"
print(msg, file=sys.stderr)
return False
print(f"\n📄 Processing: {pdf_file.name}")
if batch_mode:
print(f"\n📄 Processing: {pdf_file.name}")
else:
print("📊 Computing derived metrics...")
# Parse PDF
d = parse_dexa_pdf(pdf_path)
@ -515,11 +532,19 @@ def process_single_pdf(pdf_path, height_in, weight_lb, outdir):
md_text = make_markdown(measured_date, d, derived, total_mass)
append_markdown(os.path.join(outdir, "summary.md"), md_text)
print(f"{pdf_file.name}: Body fat {d.get('body_fat_percent')}%, FFMI {derived.get('ffmi')}")
if batch_mode:
print(f"{pdf_file.name}: Body fat {d.get('body_fat_percent')}%, FFMI {derived.get('ffmi')}")
else:
# Single-file mode prints detailed success info outside this function
pass
return True
except Exception as e:
print(f" ❌ Error processing {pdf_path}: {e}", file=sys.stderr)
if batch_mode:
print(f" ❌ Error processing {pdf_path}: {e}", file=sys.stderr)
else:
print(f"❌ Error reading PDF: {e}", file=sys.stderr)
print("This tool is specifically designed for BodySpec PDF reports.", file=sys.stderr)
return False
def make_markdown(measured_date, d, derived, total_mass):
@ -638,7 +663,7 @@ def main():
except Exception:
pass # If we can't extract date, try to process anyway
if process_single_pdf(str(pdf_file), args.height_in, args.weight_lb, args.outdir):
if process_single_pdf(str(pdf_file), args.height_in, args.weight_lb, args.outdir, batch_mode=True):
success_count += 1
else:
fail_count += 1
@ -657,202 +682,31 @@ def main():
return
# Single file mode
pdf_file = Path(args.pdf)
if not pdf_file.exists():
print(f"❌ Error: PDF file not found: {args.pdf}", file=sys.stderr)
sys.exit(1)
if not pdf_file.is_file():
print(f"❌ Error: Path is not a file: {args.pdf}", file=sys.stderr)
sys.exit(1)
if pdf_file.suffix.lower() != '.pdf':
print(f"❌ Error: File is not a PDF: {args.pdf}", file=sys.stderr)
sys.exit(1)
print(f"📄 Reading PDF: {args.pdf}")
try:
d = parse_dexa_pdf(args.pdf)
except Exception as e:
print(f"❌ Error reading PDF: {e}", file=sys.stderr)
print("This tool is specifically designed for BodySpec PDF reports.", file=sys.stderr)
# Use the shared processing function
success = process_single_pdf(args.pdf, args.height_in, args.weight_lb, args.outdir, batch_mode=False)
if not success:
sys.exit(1)
# Check if critical data was extracted
if d.get("body_fat_percent") is None or d.get("total_mass_lb") is None:
print("⚠️ Warning: Missing critical data from PDF. This may not be a BodySpec report.", file=sys.stderr)
if d.get("body_fat_percent") is None:
print(" - Body Fat % not found", file=sys.stderr)
if d.get("total_mass_lb") is None:
print(" - Total Mass not found", file=sys.stderr)
print("📊 Computing derived metrics...")
measured_date_raw = d.get("measured_date") or datetime.now().strftime("%m/%d/%Y")
measured_date = convert_date_to_iso(measured_date_raw)
total_mass, derived = compute_derived(d, height_in=args.height_in, weight_lb=args.weight_lb)
# Overall CSV row
overall_cols = [
"MeasuredDate","Height_in","Height_ft_in","Weight_lb_Input","DEXA_TotalMass_lb","BodyFat_percent",
"LeanMass_percent","FatMass_lb","LeanSoftTissue_lb","BoneMineralContent_lb","FatFreeMass_lb",
"BMI","FFMI","FMI","LST_Index","ALM_lb","SMI","VAT_Mass_lb","VAT_Volume_in3","VAT_Index",
"BMDI","Android_percent","Gynoid_percent","AG_Ratio","Trunk_to_Limb_Fat_Ratio",
"Arms_Lean_pct","Legs_Lean_pct","Trunk_Lean_pct","Arm_Symmetry_Index","Leg_Symmetry_Index",
"Adjusted_Body_Weight_lb","RMR_cal_per_day"
]
overall_row = {
"MeasuredDate": measured_date,
"Height_in": derived["height_in"],
"Height_ft_in": derived["height_ft_in"],
"Weight_lb_Input": derived["weight_input_lb"],
"DEXA_TotalMass_lb": round(total_mass, 1),
"BodyFat_percent": d.get("body_fat_percent"),
"LeanMass_percent": derived.get("lean_mass_percent"),
"FatMass_lb": d.get("fat_mass_lb"),
"LeanSoftTissue_lb": d.get("lean_soft_tissue_lb"),
"BoneMineralContent_lb": d.get("bmc_lb"),
"FatFreeMass_lb": derived.get("fat_free_mass_lb"),
"BMI": derived["bmi"],
"FFMI": derived.get("ffmi"),
"FMI": derived.get("fmi"),
"LST_Index": derived.get("lsti"),
"ALM_lb": derived.get("alm_lb"),
"SMI": derived.get("smi"),
"VAT_Mass_lb": d.get("vat_mass_lb"),
"VAT_Volume_in3": d.get("vat_volume_in3"),
"VAT_Index": derived.get("vat_index"),
"BMDI": derived.get("bmdi"),
"Android_percent": d.get("android_percent"),
"Gynoid_percent": d.get("gynoid_percent"),
"AG_Ratio": d.get("ag_ratio"),
"Trunk_to_Limb_Fat_Ratio": derived.get("trunk_to_limb_fat_ratio"),
"Arms_Lean_pct": derived.get("arms_lean_pct"),
"Legs_Lean_pct": derived.get("legs_lean_pct"),
"Trunk_Lean_pct": derived.get("trunk_lean_pct"),
"Arm_Symmetry_Index": derived.get("arm_symmetry_index"),
"Leg_Symmetry_Index": derived.get("leg_symmetry_index"),
"Adjusted_Body_Weight_lb": derived.get("adjusted_body_weight_lb"),
"RMR_cal_per_day": d.get("rmr_cal_per_day"),
}
write_or_append_csv(os.path.join(args.outdir, "overall.csv"), overall_row, overall_cols)
# Regional table
regional_cols = ["Region","FatPercent","LeanPercent","TotalMass_lb","FatTissue_lb","LeanTissue_lb","BMC_lb"]
reg_rows = []
for name, r in d.get("regional", {}).items():
# Calculate lean percentage (lean tissue only, not including BMC - matches BodySpec report)
lean_pct = round(100 * r["lean_tissue_lb"] / r["total_mass_lb"], 1) if r["total_mass_lb"] > 0 else None
reg_rows.append({
"Region": name,
"FatPercent": r["fat_percent"],
"LeanPercent": lean_pct,
"TotalMass_lb": r["total_mass_lb"],
"FatTissue_lb": r["fat_tissue_lb"],
"LeanTissue_lb": r["lean_tissue_lb"],
"BMC_lb": r["bmc_lb"],
})
regional_path = os.path.join(args.outdir, "regional.csv")
df_regional = pd.DataFrame(reg_rows, columns=regional_cols)
if os.path.exists(regional_path):
df_regional.to_csv(regional_path, mode="a", header=False, index=False)
else:
df_regional.to_csv(regional_path, index=False)
# Muscle balance
mb_cols = ["Region","FatPercent","TotalMass_lb","FatMass_lb","LeanMass_lb","BMC_lb"]
mb_rows = []
for name, r in d.get("muscle_balance", {}).items():
mb_rows.append({
"Region": name,
"FatPercent": r["fat_percent"],
"TotalMass_lb": r["total_mass_lb"],
"FatMass_lb": r["fat_mass_lb"],
"LeanMass_lb": r["lean_mass_lb"],
"BMC_lb": r["bmc_lb"],
})
mb_path = os.path.join(args.outdir, "muscle_balance.csv")
if os.path.exists(mb_path):
pd.DataFrame(mb_rows).to_csv(mb_path, mode="a", header=False, index=False)
else:
pd.DataFrame(mb_rows).to_csv(mb_path, index=False)
# JSON (overall structured object)
# Convert regional and muscle_balance dicts to arrays
regional_array = []
for name, data in d.get("regional", {}).items():
lean_pct = round(100 * data["lean_tissue_lb"] / data["total_mass_lb"], 1) if data["total_mass_lb"] > 0 else None
regional_array.append({
"region": name,
"fat_percent": data["fat_percent"],
"lean_percent": lean_pct,
"total_mass_lb": data["total_mass_lb"],
"fat_tissue_lb": data["fat_tissue_lb"],
"lean_tissue_lb": data["lean_tissue_lb"],
"bmc_lb": data["bmc_lb"]
})
muscle_balance_array = [
{"region": name, **data}
for name, data in d.get("muscle_balance", {}).items()
]
# Parse the result to show summary info
try:
# Read the latest entry from overall.json to get the summary data
json_path = os.path.join(args.outdir, "overall.json")
if os.path.exists(json_path):
with open(json_path, 'r') as f:
data = json.load(f)
latest = data[-1] if isinstance(data, list) and data else data
measured_date = latest.get("measured_date", "Unknown")
body_fat = latest.get("composition", {}).get("body_fat_percent", "N/A")
ffmi = latest.get("composition", {}).get("derived_indices", {}).get("ffmi", "N/A")
else:
measured_date = body_fat = ffmi = "N/A"
except Exception:
measured_date = body_fat = ffmi = "N/A"
overall_json = {
"measured_date": measured_date,
"anthropometrics": {
"height_in": derived["height_in"],
"height_ft_in": derived["height_ft_in"],
"weight_input_lb": derived["weight_input_lb"],
"dexa_total_mass_lb": round(total_mass, 1),
"adjusted_body_weight_lb": derived.get("adjusted_body_weight_lb"),
"bmi": derived["bmi"]
},
"composition": {
"body_fat_percent": d.get("body_fat_percent"),
"lean_mass_percent": derived.get("lean_mass_percent"),
"fat_mass_lb": d.get("fat_mass_lb"),
"lean_soft_tissue_lb": d.get("lean_soft_tissue_lb"),
"bone_mineral_content_lb": d.get("bmc_lb"),
"fat_free_mass_lb": derived.get("fat_free_mass_lb"),
"derived_indices": {
"ffmi": derived.get("ffmi"),
"fmi": derived.get("fmi"),
"lsti": derived.get("lsti"),
"alm_lb": derived.get("alm_lb"),
"smi": derived.get("smi"),
"bmdi": derived.get("bmdi")
}
},
"regional": regional_array,
"regional_analysis": {
"trunk_to_limb_fat_ratio": derived.get("trunk_to_limb_fat_ratio"),
"lean_mass_distribution": {
"arms_percent": derived.get("arms_lean_pct"),
"legs_percent": derived.get("legs_lean_pct"),
"trunk_percent": derived.get("trunk_lean_pct")
}
},
"muscle_balance": muscle_balance_array,
"symmetry_indices": {
"arm_symmetry_index": derived.get("arm_symmetry_index"),
"leg_symmetry_index": derived.get("leg_symmetry_index")
},
"supplemental": {
"android_percent": d.get("android_percent"),
"gynoid_percent": d.get("gynoid_percent"),
"ag_ratio": d.get("ag_ratio"),
"vat": {
"mass_lb": d.get("vat_mass_lb"),
"volume_in3": d.get("vat_volume_in3"),
"vat_index": derived.get("vat_index")
},
"rmr_cal_per_day": d.get("rmr_cal_per_day")
},
"bone_density": d.get("bone_density", {})
}
write_or_append_json(os.path.join(args.outdir, "overall.json"), overall_json)
# Markdown summary (append)
md_text = make_markdown(measured_date, d, derived, total_mass)
append_markdown(os.path.join(args.outdir, "summary.md"), md_text)
# Success output
print(f"\n✅ Success! Wrote files to: {args.outdir}")
print(" 📁 Files created:")
print(" - overall.csv (time-series data)")
@ -861,8 +715,8 @@ def main():
print(" - overall.json (structured data)")
print(" - summary.md (readable report)")
print(f"\n 📈 Scan date: {measured_date}")
print(f" 💪 Body fat: {d.get('body_fat_percent')}%")
print(f" 🏋️ FFMI: {derived.get('ffmi')}")
print(f" 💪 Body fat: {body_fat}%")
print(f" 🏋️ FFMI: {ffmi}")
if __name__ == "__main__":
main()