from .Background_Class import Background
from .Signal_Class import Signal
import os
import re
import shutil
import numpy as np
from scipy.optimize import minimize, curve_fit
from scipy.integrate import dblquad
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.cm import get_cmap
from matplotlib.colors import Normalize
from iminuit import Minuit
from iminuit.cost import ExtendedUnbinnedNLL, BinnedNLL
from tabulate import tabulate
from tqdm import tqdm
from tqdm import trange
from scipy.stats import norm
from sweights import SWeight
[docs]
class Signal_Background:
"""
Combined Signal and Background probability distribution.
This class models a mixture distribution defined by:
S(X, Y) = f * Signal(X, Y) + (1-f) * Background(X, Y)
The Signal distribution is defined as the product of a CrystalBall distribution (X dimension)
and an ExponentialDecay distribution (Y dimension).
The Background distribution is defined as the product of a Uniform distribution (X dimension)
and a Normal distribution (Y dimension).
Parameters
----------
mu : float
The mean of the CrystalBall distribution in the X dimension for the Signal component.
sigma : float
The standard deviation of the CrystalBall distribution in the X dimension for the Signal component.
beta : float
The threshold value of the CrystalBall distribution in the X dimension for the Signal component.
Must be beta > 0.
m : float
The power-law tail exponent of the CrystalBall distribution in the X dimension for the Signal component.
Must be m > 1.
lamb : float
The decay constant (rate) of the ExponentialDecay distribution in the Y dimension for the Signal component.
Must be lamb > 0.
mu_b : float
The mean of the Normal distribution in the Y dimension for the Background component.
sigma_b : float
The standard deviation of the Normal distribution in the Y dimension for the Background component.
Must be sigma_b > 0.
f : float
The fraction of the Signal distribution in the mixture. Must be in the range [0, 1].
lower_bound_X : float
The lower bound of the Uniform distribution in the X dimension for the Background component
and the truncation of the CrystalBall distribution in the Signal component.
upper_bound_X : float
The upper bound of the Uniform distribution in the X dimension for the Background component
and the truncation of the CrystalBall distribution in the Signal component.
Must satisfy lower_bound_X < upper_bound_X.
lower_bound_Y : float, optional
The lower bound of both the ExponentialDecay distribution (Signal component)
and the Normal distribution (Background component) in the Y dimension. Default is None.
upper_bound_Y : float, optional
The upper bound of both the ExponentialDecay distribution (Signal component)
and the Normal distribution (Background component) in the Y dimension. Default is None.
Raises
------
ValueError
If f is not in the range [0, 1].
If beta <= 0.
If m <= 1.
If lamb <= 0.
If sigma_b <= 0.
If lower_bound_X >= upper_bound_X.
If lower_bound_Y >= upper_bound_Y.
"""
[docs]
def __init__(self, mu, sigma, beta, m, lamb, mu_b, sigma_b, f, lower_bound_X, upper_bound_X, lower_bound_Y=None, upper_bound_Y=None):
# Defined the fraction between the signal and background contributions
if f < 0 or f > 1:
raise ValueError("f must be in the range [0, 1]")
self.f = f
# Other errors are automatically raised by the underlying distributions
try:
self.Signal = Signal(mu, sigma, beta, m, lamb, lower_bound_X, upper_bound_X, lower_bound_Y, upper_bound_Y)
self.Background = Background(mu_b, sigma_b, lower_bound_X, upper_bound_X, lower_bound_Y, upper_bound_Y)
except ValueError as e:
raise ValueError(f"Error when initialising Signal_Background distribution: {e}")
self.true_params = [mu, sigma, beta, m, lamb, mu_b, sigma_b, f]
self.samples = None
self.lower_bound_X = lower_bound_X
self.upper_bound_X = upper_bound_X
self.lower_bound_Y = lower_bound_Y
self.upper_bound_Y = upper_bound_Y
# If both lower and upper bounds are defined, find the maximum value of the PDF in this region from the start
if self.lower_bound_X is not None and self.upper_bound_X is not None:
self.max_pdf = self._find_max_pdf()
[docs]
def pdf(self, X, Y):
"""
Calculate the joint Probability Density Function (PDF).
The Joint PDF is defined as:
S_B(X, Y) = f * Signal_PDF(X, Y) + (1-f) * Background_PDF(X, Y)
Parameters
----------
X : float or np.ndarray
The value(s) of X at which to evaluate the PDF.
Y : float or np.ndarray
The value(s) of Y at which to evaluate the PDF.
Returns
-------
float or np.ndarray
The normalized joint PDF value(s)
0 if X is outside [lower_bound_X, upper_bound_X] or Y is outside [lower_bound_Y, upper_bound_Y].
"""
return self.f * self.Signal.pdf(X, Y) + (1 - self.f) * self.Background.pdf(X, Y)
[docs]
def cdf(self, X, Y):
"""
Compute the joint Cumulative Distribution Function (CDF).
The Joint CDF is defined as:
C(X, Y) = Integral of S_B(X,Y) from 0, X and 0, Y
As the distributions are independent, the joint CDF is the product of the individual CDFs:
C(X, Y) = f * Signal_CDF(X, Y) + (1-f) * Background_CDF(X, Y)
Parameters
----------
X : float or np.ndarray
The value(s) of X at which to evaluate the CDF.
Y : float or np.ndarray
The value(s) of Y at which to evaluate the CDF.
Returns
-------
float or np.ndarray
The joint CDF value(s),
"""
return self.f * self.Signal.cdf(X, Y) + (1 - self.f) * self.Background.cdf(X, Y)
[docs]
def _find_max_pdf(self):
"""
Finds the maximum value of the joint PDF by first using a rough grid search and then a local optimisation.
Only performed if both the lower and upper bounds are defined.
Returns
-------
float
The maximum value of the joint PDF within the defined bounds.
"""
# Coarse grid search
# Make use of numpy meshgrid to create a grid of points
# Allows the use of vectorized operations
x_vals = np.linspace(self.lower_bound_X, self.upper_bound_X, 10)
y_vals = np.linspace(self.lower_bound_Y, self.upper_bound_Y, 10)
X, Y = np.meshgrid(x_vals, y_vals)
Z = self.pdf(X, Y)
# Find the maximum value of the PDF over the grid
max_index = np.unravel_index(np.argmax(Z), Z.shape)
# Returns the X and Y values of the maximum point to use as the starting point for the local optimization
best_start = (X[max_index], Y[max_index])
# Local optimization starting from the best grid point
def neg_pdf(point):
x, y = point
return -self.pdf(x, y)
# Use scipy.optimize.minimize to find the minimium value of the negetive PDF
# Limited-memory Broyden–Fletcher–Goldfarb–Shanno with Box constraint
# Quick and precise for solving bounded optimization problems
min_result = minimize(neg_pdf,best_start, bounds=[(self.lower_bound_X, self.upper_bound_X), (self.lower_bound_Y, self.upper_bound_Y)], method="L-BFGS-B")
if min_result.success:
max_pdf = -min_result.fun
print(f"Maximum PDF value found: {max_pdf}")
else:
raise RuntimeError("Optimisation failed")
return max_pdf
[docs]
def normalisation_check(self, over_whole_plane=False):
"""
Check the normalization of the joint Probability Density Function (PDF) using numerical integration.
This method performs numerical integration with `scipy.integrate.dblquad` to ensure that the joint PDF
integrates to 1. It supports both truncated and untruncated cases.
Parameters
----------
over_whole_plane : bool, optional
If True, perform integration over the entire real plane (-infinity to infinity) for both X and Y.
Default is False, in which case integration is only performed over the defined/truncated region.
Returns
-------
Normalisation results for:
- The defined/truncated region: [lower_bound_X, upper_bound_X] for X and [lower_bound_Y, upper_bound_Y] for Y.
- The entire real plane: X in [-infinity, infinity] and Y in [-infinity, infinity] (only if `over_whole_plane` is True).
Notes
-----
- If the PDF is truncated, the method integrates over the truncated region defined by the bounds (`lower_bound_X`, `upper_bound_X`, `lower_bound_Y`, `upper_bound_Y`).
- If no bounds are defined, the truncated region defaults to the entire real plane.
- The integration over the entire real plane is computationally intensive and can be skipped by setting `over_whole_plane` to False.
"""
# Set the limits for the integration, if None remains it will not pass into integration
if self.lower_bound_X is not None:
lower_bound_X = self.lower_bound_X
else:
lower_bound_X = -np.inf
if self.upper_bound_X is not None:
upper_bound_X = self.upper_bound_X
else:
upper_bound_X = np.inf
if self.lower_bound_Y is not None:
lower_bound_Y = self.lower_bound_Y
else:
lower_bound_Y = -np.inf
if self.upper_bound_Y is not None:
upper_bound_Y = self.upper_bound_Y
else:
upper_bound_Y = np.inf
if (self.lower_bound_X is not None) or (self.upper_bound_X is not None) or (self.lower_bound_Y is not None) or (self.upper_bound_Y is not None):
print(f"Normalisation over the region the PDF is defined/truncated: [{lower_bound_X}, {upper_bound_X}] in X, [{lower_bound_Y}, {upper_bound_Y}] in Y")
# have to use constant `lambda` functions for the y limits of integration due to format of
integral_bounds, error_bounds = dblquad(lambda y, x: self.pdf(x, y), lower_bound_X, upper_bound_X, lambda x: lower_bound_Y, lambda x: upper_bound_Y)
print(f"Integral: {integral_bounds} \u00B1 {error_bounds}")
if over_whole_plane:
print(f"Normalisation over the whole real plane: X in [-infinity, infinity], Y in [-infinity, infinity]")
integral_inf, error_inf = dblquad(lambda y, x: self.pdf(x, y), -np.inf, np.inf, lambda x: -np.inf, lambda x: np.inf)
print(f"Integral: {integral_inf} \u00B1 {error_inf}")
[docs]
def plot_dist(self):
"""
Visualize the joint Probability Density Function (PDF) in 2D and 3D.
LHS: A 2D contour plot with a color bar to represent the PDF values.
RHS: A 3D surface plot to show the PDF as a function of X and Y.
The X and Y ranges for the plots are determined based on the bounds provided for the PDF, with slight overextensions to display regions of zero probability.
"""
# The X values the PDF will span over - slight overextension to show zero probability regions
X = np.linspace(self.lower_bound_X - 0.05*(self.upper_bound_X - self.lower_bound_X), self.upper_bound_X + 0.05*(self.upper_bound_X-self.lower_bound_X), 1000)
# All the cases in which the Y values will span over
# Account for case where no bounds, one bound or both bounds are given
# slight overextension to show zero probability regions
if self.lower_bound_Y is not None and self.upper_bound_Y is not None:
Y = np.linspace(self.lower_bound_Y - 0.05*(self.upper_bound_Y - self.lower_bound_Y), self.upper_bound_Y + 0.05*(self.upper_bound_Y-self.lower_bound_Y), 1000)
elif self.lower_bound_Y is not None and self.upper_bound_Y is None:
Y = np.linspace(self.lower_bound_Y, self.lower_bound_Y + 6*self.true_params[6], 1000)
elif self.lower_bound_Y is None and self.upper_bound_Y is not None:
Y = np.linspace(self.upper_bound_Y - 6*self.true_params[6], self.upper_bound_Y, 1000)
else:
Y = np.linspace(self.true_params[5] - 3*self.true_params[6], self.true_params[5] + 3*self.true_params[6], 1000)
# Meshgrid for the 3D plot
X, Y = np.meshgrid(X, Y)
Z = self.pdf(X, Y)
fig = plt.figure(figsize=(16, 7))
spec = gridspec.GridSpec(1, 2, width_ratios=[1, 1])
label_fontsize = 20
tick_fontsize = 18
colorbar_fontsize = 14
# LHS: Contour plot with a color bar to s
ax1 = fig.add_subplot(spec[0, 0])
contour = ax1.contourf(X, Y, Z, levels=50, cmap='viridis')
ax1.set_xlabel('X', fontsize=label_fontsize, labelpad=12)
ax1.set_ylabel('Y', fontsize=label_fontsize, labelpad=12)
ax1.tick_params(axis='both', labelsize=tick_fontsize)
colorbar = fig.colorbar(contour, ax=ax1, shrink=0.8)
colorbar.ax.tick_params(labelsize=colorbar_fontsize)
# RHS: 3D surface plot
ax2 = fig.add_subplot(spec[0, 1], projection='3d')
ax2.plot_surface(X, Y, Z, cmap='viridis', alpha=0.9)
ax2.set_xlabel('X', fontsize=label_fontsize, labelpad=15)
ax2.set_ylabel('Y', fontsize=label_fontsize, labelpad=15)
ax2.set_zlabel('Total PDF', fontsize=label_fontsize, labelpad=15)
ax2.tick_params(axis='both', labelsize=tick_fontsize)
ax2.view_init(elev=30, azim=60)
plt.tight_layout()
plt.show()
[docs]
def marginal_pdf_x(self, X):
"""
Calculate the marginal Probability Density Function (PDF) in the X dimension.
Integral of the joint PDF over the Y dimension, to remove the Y dependence.
The marginal PDF is defined as:
S_B(X) = Integral of S_B(X, Y) wrt Y over [-infinity, infinity]
Parameters
----------
X : float or np.ndarray
The value(s) of X at which to evaluate the PDF.
Returns
-------
tuple of (float or np.ndarray, float or np.ndarray, float or np.ndarray)
A tuple containing:
- The signal component of the marginal PDF.
- The background component of the marginal PDF.
- The total marginal PDF (signal + background).
"""
# Use pre defined X component of the Signal and Background PDFs to calculate the marginal PDF
signal_cont_pdf = self.f*self.Signal.X.pdf(X)
background_cont_pdf = (1 - self.f)*self.Background.X.pdf(X)
# Return the signal, background and total PDF values seperately for plotting
return signal_cont_pdf, background_cont_pdf, signal_cont_pdf + background_cont_pdf
[docs]
def marginal_cdf_x(self, X):
"""
Calculate the marginal Cumulative Distribution Function (CDF) in the X dimension.
Integral of the joint CDF over the Y dimension, to remove the Y dependence.
The marginal CDF is defined as:
C(X) = Integral of C_B(X, Y) wrt Y over [-infinity, infinity]
Parameters
----------
X : float or np.ndarray
The value(s) of X at which to evaluate the CDF.
Returns
-------
tuple of (float or np.ndarray, float or np.ndarray, float or np.ndarray)
A tuple containing:
- The signal component of the marginal CDF.
- The background component of the marginal CDF.
- The total marginal CDF (signal + background).
"""
# Use pre defined X component of the Signal and Background CDFs to calculate the marginal CDF
signal_cont_cdf = self.f*self.Signal.X.cdf(X)
background_cont_cdf = (1 - self.f)*self.Background.X.cdf(X)
# Return the signal, background and total PDF values seperately for plotting
return signal_cont_cdf, background_cont_cdf, signal_cont_cdf + background_cont_cdf
[docs]
def marginal_pdf_y(self, Y):
"""
Calculate the marginal Probability Density Function (PDF) in the Y dimension.
Integral of the joint PDF over the X dimension, to remove the X dependence.
The marginal PDF is defined as:
S_B(Y) = Integral of S_B(X, Y) wrt X over [-infinity, infinity]
Parameters
----------
Y : float or np.ndarray
The value(s) of Y at which to evaluate the PDF.
Returns
-------
tuple of (float or np.ndarray, float or np.ndarray, float or np.ndarray)
A tuple containing:
- The signal component of the marginal PDF.
- The background component of the marginal PDF.
- The total marginal PDF (signal + background).
"""
# Use pre defined Y component of the Signal and Background PDFs to calculate the marginal PDF
signal_cont_pdf = self.f*self.Signal.Y.pdf(Y)
background_cont_pdf = (1 - self.f)*self.Background.Y.pdf(Y)
# Return the signal, background and total PDF values seperately for plotting
return signal_cont_pdf, background_cont_pdf, signal_cont_pdf + background_cont_pdf
[docs]
def marginal_cdf_y(self, Y):
"""
Calculate the marginal Cumulative Distribution Function (CDF) in the Y dimension.
Integral of the joint CDF over the X dimension, to remove the X dependence.
The marginal CDF is defined as:
C(Y) = Integral of C_B(X, Y) wrt X over [-infinity, infinity]
Parameters
----------
Y : float or np.ndarray
The value(s) of Y at which to evaluate the CDF.
Returns
-------
tuple of (float or np.ndarray, float or np.ndarray, float or np.ndarray)
A tuple containing:
- The signal component of the marginal CDF.
- The background component of the marginal CDF.
- The total marginal CDF (signal + background).
"""
# Use pre defined Y component of the Signal and Background CDFs to calculate the marginal CDF
signal_cont_cdf = self.f*self.Signal.Y.cdf(Y)
background_cont_cdf = (1 - self.f)*self.Background.Y.cdf(Y)
# Return the signal, background and total PDF values seperately for plotting
return signal_cont_cdf, background_cont_cdf, signal_cont_cdf + background_cont_cdf
[docs]
def plot_marginal(self):
"""
Create a 2x2 grid of plots:
- Top left: marginal_pdf_x with signal and background contributions and overall
- Top right: marginal_cdf_x with signal and background contributions and overall
- Bottom left: marginal_pdf_y with signal and background contributions and overall
- Bottom right: marginal_cdf_y with signal and background contributions and overall
"""
X = np.linspace(self.lower_bound_X - 0.05*(self.upper_bound_X - self.lower_bound_X), self.upper_bound_X + 0.05*(self.upper_bound_X-self.lower_bound_X), 1000)
# given all options to support when only one bound is given
if self.lower_bound_Y is not None and self.upper_bound_Y is not None:
Y = np.linspace(self.lower_bound_Y - 0.05*(self.upper_bound_Y - self.lower_bound_Y), self.upper_bound_Y + 0.05*(self.upper_bound_Y-self.lower_bound_Y), 1000)
elif self.lower_bound_Y is not None and self.upper_bound_Y is None:
Y = np.linspace(self.lower_bound_Y, self.lower_bound_Y + 6*self.true_params[6], 100)
elif self.lower_bound_Y is None and self.upper_bound_Y is not None:
Y = np.linspace(self.upper_bound_Y - 6*self.true_params[6], self.upper_bound_Y, 100)
else:
Y = np.linspace(self.true_params[5] - 3*self.true_params[6], self.true_params[5] + 3*self.true_params[6], 1000)
fig, axs = plt.subplots(2, 2, figsize=(16, 12))
# Predetermined font sizes and line widths
label_fontsize = 16
tick_fontsize = 14
legend_fontsize = 14
line_width = 2
alpha_value = 0.8
# Top left: marginal_pdf_x
signal_pdf_x, background_pdf_x, total_pdf_x = self.marginal_pdf_x(X)
axs[0, 0].plot(X, signal_pdf_x, label='Signal PDF: Crystal Ball', color='green', linewidth=line_width, alpha=alpha_value)
axs[0, 0].plot(X, background_pdf_x, label='Background PDF: Uniform', color='blue', linewidth=line_width, alpha=alpha_value)
axs[0, 0].plot(X, total_pdf_x, label='Total PDF', color='red', linewidth=line_width, alpha=alpha_value)
axs[0, 0].set_xlabel('X', fontsize=label_fontsize)
axs[0, 0].set_ylabel(f'Marginal PDF in X (f = {self.f})', fontsize=label_fontsize)
axs[0, 0].tick_params(axis='both', which='major', labelsize=tick_fontsize)
axs[0, 0].legend(fontsize=legend_fontsize)
axs[0, 0].grid(linestyle='--', alpha=0.6)
# Top right: marginal_cdf_x
signal_cdf_x, background_cdf_x, total_cdf_x = self.marginal_cdf_x(X)
axs[0, 1].plot(X, signal_cdf_x, label='Signal CDF: Crystal Ball', color='green', linewidth=line_width, alpha=alpha_value)
axs[0, 1].plot(X, background_cdf_x, label='Background CDF: Uniform', color='blue', linewidth=line_width, alpha=alpha_value)
axs[0, 1].plot(X, total_cdf_x, label='Total CDF', color='red', linewidth=line_width, alpha=alpha_value)
axs[0, 1].set_xlabel('X', fontsize=label_fontsize)
axs[0, 1].set_ylabel(f'Marginal CDF in X (f = {self.f})', fontsize=label_fontsize)
axs[0, 1].tick_params(axis='both', which='major', labelsize=tick_fontsize)
axs[0, 1].legend(fontsize=legend_fontsize)
axs[0, 1].grid(linestyle='--', alpha=0.6)
# Bottom left: marginal_pdf_y
signal_pdf_y, background_pdf_y, total_pdf_y = self.marginal_pdf_y(Y)
axs[1, 0].plot(Y, signal_pdf_y, label='Signal PDF: Exponential', color='green', linewidth=line_width, alpha=alpha_value)
axs[1, 0].plot(Y, background_pdf_y, label='Background PDF: Normal', color='blue', linewidth=line_width, alpha=alpha_value)
axs[1, 0].plot(Y, total_pdf_y, label='Total PDF', color='red', linewidth=line_width, alpha=alpha_value)
axs[1, 0].set_xlabel('Y', fontsize=label_fontsize)
axs[1, 0].set_ylabel(f'Marginal PDF in Y (f = {self.f})', fontsize=label_fontsize)
axs[1, 0].tick_params(axis='both', which='major', labelsize=tick_fontsize)
axs[1, 0].legend(fontsize=legend_fontsize)
axs[1, 0].grid(linestyle='--', alpha=0.6)
# Bottom right: marginal_cdf_y
signal_cdf_y, background_cdf_y, total_cdf_y = self.marginal_cdf_y(Y)
axs[1, 1].plot(Y, signal_cdf_y, label='Signal CDF: Exponential', color='green', linewidth=line_width, alpha=alpha_value)
axs[1, 1].plot(Y, background_cdf_y, label='Background CDF: Normal', color='blue', linewidth=line_width, alpha=alpha_value)
axs[1, 1].plot(Y, total_cdf_y, label='Total CDF', color='red', linewidth=line_width, alpha=alpha_value)
axs[1, 1].set_xlabel('Y', fontsize=label_fontsize)
axs[1, 1].set_ylabel(f'Marginal CDF in Y (f = {self.f})', fontsize=label_fontsize)
axs[1, 1].tick_params(axis='both', which='major', labelsize=tick_fontsize)
axs[1, 1].legend(fontsize=legend_fontsize)
axs[1, 1].grid(linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
[docs]
def accept_reject_sample(self, desired_samples=100000, init_batch_size=1000, max_batch_size=2000000, poisson = False, save_to_class=False):
"""
Generate random samples from the joint Signal-Background distribution
using the accept-reject method with dynamic batch sizing.
This method uses an initial batch to estimate the acceptance rate and
dynamically adjusts the batch size to efficiently generate the required number
of samples.
Parameters
----------
desired_samples : int, optional
Total number of samples to generate (default: 100,000).
init_batch_size : int, optional
Batch size for the initial sampling to estimate the acceptance rate (default: 1,000).
max_batch_size : int, optional
Maximum batch size for iterations to prevent overloading memory (default: 2,000,000).
May need adjusting for devices with limited memory.
poisson : bool, optional
If True, the total number of samples (`desired_samples`) will be drawn
from a Poisson distribution with a mean of `desired_samples` (default: False).
save_to_class : bool, optional
If True, the generated samples will be saved as an attribute of the class
(`self.samples`) for later use (default: False).
Returns
-------
np.ndarray
Array of shape (desired_samples, 2) containing the sampled (X, Y) points.
Raises
------
ValueError
If any of the bounds are not defined, this is required to generate sample
Notes
-----
- The initial batch estimates the acceptance rate as:
acceptance_rate = (Number of Accepted Samples in Initial Batch) / (Initial Batch Size)
- Subsequent batch sizes are calculated dynamically based on the acceptance rate and
the number of remaining desired samples, with a 10% buffer.
- A maximum batch size (`max_batch_size`) is enforced to ensure memory efficiency.
"""
if self.lower_bound_X is None or self.upper_bound_X is None or self.lower_bound_Y is None or self.upper_bound_Y is None:
raise ValueError("To preform Accept-reject sampling both X and Y limits must be set.")
# Apply Poisson variation if enabled
if poisson:
actual_samples = np.random.poisson(desired_samples)
else:
actual_samples = desired_samples
# Pre-allocate space for samples
samples = np.empty((actual_samples, 2))
sample_count = 0
# Run an inital batch to estimate the acceptance rate the dynamic batch size on
# Generates smaller batches: X, Y, Uniform Random, True Values
batch_X = np.random.uniform(self.lower_bound_X, self.upper_bound_X, size=init_batch_size)
batch_Y = np.random.uniform(self.lower_bound_Y, self.upper_bound_Y, size=init_batch_size)
batch_uniform = np.random.uniform(0, self.max_pdf, size=init_batch_size)
batch_true_vals = self.pdf(batch_X, batch_Y)
# Compare Uniform Random to find the accepted samples, where within pdf
# Each index is a boolean value
batch_accept_index = batch_uniform <= batch_true_vals
# Collect x and y samples that are accepted
accepted_samples = np.array([batch_X[batch_accept_index], batch_Y[batch_accept_index]]).T
# Add samples to overall score the samples array with the accepted samples
num_accepted = len(accepted_samples)
samples[:num_accepted] = accepted_samples
sample_count += num_accepted
# Estimate acceptance rate
# Use np.sum to count the number of True values in the array
acceptance_rate = num_accepted/ init_batch_size
# While samples is less than desired number - produce samples to account for acceptance rate
# Generate samples in batches until the desired number of samples is reached - ideally in 1 batch
while sample_count < actual_samples:
# Calculate the remaining desired samples
remain_actual_samples = actual_samples - sample_count
# Calculate the batch size to generate based on remaining samples and acceptance rate
# A maximum batch size, 500000, is set to prevent overloading memory
# A 10% buffer is added to the batch size to ensure enough samples are generated
batch_size = min(int(1.1*(remain_actual_samples / acceptance_rate)), max_batch_size)
# Generate a new batch of proposals using the same method as above
batch_X = np.random.uniform(self.lower_bound_X, self.upper_bound_X, size=batch_size)
batch_Y = np.random.uniform(self.lower_bound_Y, self.upper_bound_Y, size=batch_size)
batch_uniform = np.random.uniform(0, self.max_pdf, size=batch_size)
batch_true_vals = self.pdf(batch_X, batch_Y)
batch_accept_index = batch_uniform <= batch_true_vals
accepted_samples = np.array([batch_X[batch_accept_index], batch_Y[batch_accept_index]]).T
num_accepted = len(accepted_samples)
remaining_space = actual_samples - sample_count
# Ensure the number of items added does not exceed the remaining space
if num_accepted > remaining_space:
samples[sample_count:] = accepted_samples[:remaining_space]
sample_count += remaining_space
else:
samples[sample_count:sample_count + num_accepted] = accepted_samples
sample_count += num_accepted
if save_to_class:
# Store to the class
self.samples = samples
return samples
[docs]
def plot_samples(self, samples = None):
"""
Plot the results of the sampled data in a 2x2 grid:
- Top-left: 3D histogram of the joint distribution.
- Top-right: Surface plot of the joint PDF.
- Bottom-left: Histogram of sampled X values vs marginal PDF.
- Bottom-right: Histogram of sampled Y values vs marginal PDF.
Parameters
----------
samples : np.ndarray, optional
Array of shape (N, 2) containing the sampled data points (X, Y).
If not provided, the method attempts to use `self.samples`.
If neither is available, a ValueError is raised.
Raises
------
ValueError
If no samples are provided and no samples are stored in `self.samples`.
"""
if samples is None and self.samples is None:
raise ValueError("No samples have been generated. Please run the `accept_reject_sample` method first.")
if samples is None:
samples = self.samples
# Define ranges for X and Y based on bounds
X = np.linspace(self.lower_bound_X - 0.1*(self.upper_bound_X - self.lower_bound_X), self.upper_bound_X+ 0.1*(self.upper_bound_X - self.lower_bound_X), 1000)
Y = np.linspace(self.lower_bound_Y- 0.1*(self.upper_bound_Y - self.lower_bound_Y), self.upper_bound_Y+ 0.1*(self.upper_bound_Y - self.lower_bound_Y), 1000)
# Compute the PDF grid for plotting
X_grid, Y_grid = np.meshgrid(X, Y)
Z = self.pdf(X_grid, Y_grid)
# Set consistent font sizes
label_fontsize = 18
title_fontsize = 18
tick_fontsize = 16
fig = plt.figure(figsize=(14, 12))
# Create a 2x2 grid of plots
spec = gridspec.GridSpec(2, 2, width_ratios=[1, 1], height_ratios=[1, 1])
# Top left: 3D histogram of samples, using similiar colour map scheme to pdf plot
ax1 = fig.add_subplot(spec[0, 0], projection='3d')
hist, xedges, yedges = np.histogram2d(samples[:, 0], samples[:, 1], bins=30,
range=[[self.lower_bound_X, self.upper_bound_X],
[self.lower_bound_Y, self.upper_bound_Y]])
xpos, ypos = np.meshgrid(xedges[:-1], yedges[:-1], indexing="ij")
xpos, ypos = xpos.ravel(), ypos.ravel()
zpos = np.zeros_like(xpos)
dx = dy = (xedges[1] - xedges[0])
dz = hist.ravel()
# Apply colormap
cmap = get_cmap('viridis')
norm = Normalize(vmin=dz.min(), vmax=dz.max())
colors = cmap(norm(dz))
ax1.bar3d(xpos, ypos, zpos, dx, dy, dz, shade=True, color=colors)
ax1.set_xlabel("X", fontsize=label_fontsize)
ax1.set_ylabel("Y", fontsize=label_fontsize)
ax1.set_zlabel("Frequency", fontsize=label_fontsize, labelpad = 20)
ax1.tick_params(axis='both', labelsize=tick_fontsize)
ax1.view_init(elev=30, azim=60)
ax1.set_title("3D Histogram of Joint Distribution", fontsize=title_fontsize)
# Top-right: Surface plot of the joint PDF for comparison with histogram in top-left
ax2 = fig.add_subplot(spec[0, 1], projection='3d')
ax2.plot_surface(X_grid, Y_grid, Z, cmap='viridis', alpha=0.9)
ax2.set_xlabel("X", fontsize=label_fontsize)
ax2.set_ylabel("Y", fontsize=label_fontsize, labelpad = 10)
ax2.set_zlabel("PDF Value", fontsize=label_fontsize, labelpad = 20)
ax2.tick_params(axis='both', labelsize=tick_fontsize)
ax2.view_init(elev=30, azim=60)
ax2.set_title("Surface Plot of True Joint PDF", fontsize=title_fontsize)
# Bottom-left: Histogram of X values vs marginal PDF in X
ax3 = fig.add_subplot(spec[1, 0])
# Gives bin edges and counts
hist_X, bins_X, _ = ax3.hist(samples[:, 0], bins=30, density=True, alpha=0.5, color='navy', label="Sampled X")
# Calculate the bin centers
bin_centers_X = 0.5 * (bins_X[:-1] + bins_X[1:])
# Can ignore the signal and background contribution given (_) as they are not used
_, _, marginal_total_X = self.marginal_pdf_x(X)
ax3.plot(X, marginal_total_X, label="Marginal PDF X", color="red", linewidth=4)
ax3.set_xlim(X[0], X[-1])
ax3.set_xlabel("X", fontsize=label_fontsize)
ax3.set_ylabel("Density", fontsize=label_fontsize)
ax3.tick_params(axis='both', labelsize=tick_fontsize)
ax3.legend(fontsize=label_fontsize)
ax3.set_title("Histogram of X with Marginal PDF of X", fontsize=title_fontsize)
# Bottom-right: Histogram of samples Y values vs marginal PDF in Y
ax4 = fig.add_subplot(spec[1, 1])
# Gives bin edges and counts
hist_Y, bins_Y, _ = ax4.hist(samples[:, 1], bins=30, density=True, alpha=0.5, color='navy', label="Sampled Y")
# Calculate the bin centers
bin_centers_Y = 0.5 * (bins_Y[:-1] + bins_Y[1:])
# Can ignore the signal and background contribution given (_) as they are not used
_, _, marginal_total_Y = self.marginal_pdf_y(Y)
ax4.plot(Y, marginal_total_Y, label="Marginal PDF Y", color="red", linewidth=4)
ax4.set_xlim(Y[0], Y[-1])
ax4.set_xlabel("Y", fontsize=label_fontsize)
ax4.set_ylabel("Density", fontsize=label_fontsize)
ax4.tick_params(axis='both', labelsize=tick_fontsize)
ax4.legend(fontsize=label_fontsize)
ax4.set_title("Histogram of Y with Marginal PDF of X", fontsize=title_fontsize)
plt.tight_layout()
plt.show()
[docs]
def pdf_fitting(self, X, Y, mu, sigma, beta, m, lamb, mu_b, sigma_b, f):
"""
Calculate the joint Probability Density Function (PDF) for given parameters.
Parameters
----------
X, Y : float or np.ndarray
Values where the PDF is evaluated.
mu, sigma, beta, m, lamb, mu_b, sigma_b, f : float
Parameters to use for the calculation.
Returns
-------
np.ndarray
PDF values for the input X, Y.
"""
# Provides an undefined pdf which can then be adjusted during MLE fitting
return f * self.Signal.pdf_fitting(X, Y, mu, sigma, beta, m, lamb) + (1 - f) * self.Background.pdf_fitting(X, Y, mu_b, sigma_b)
[docs]
def cdf_fitting(self, X, Y, mu, sigma, beta, m, lamb, mu_b, sigma_b, f):
"""
Calculate the joint Probability Density Function (PDF) for given parameters.
Parameters
----------
X, Y : float or np.ndarray
Values where the PDF is evaluated.
mu, sigma, beta, m, lamb, mu_b, sigma_b, f : float
Parameters to use for the calculation.
Returns
-------
np.ndarray
PDF values for the input X, Y.
"""
# Provides an undefined pdf which can then be adjusted during MLE fitting
return f * self.Signal.cdf_fitting(X, Y, mu, sigma, beta, m, lamb) + (1 - f) * self.Background.cdf_fitting(X, Y, mu_b, sigma_b)
[docs]
def fit_params(self, initial_params, samples = None, print_results = False, save_to_class = False):
"""
Perform an extended maximum likelihood fit using `iminuit`.
This method fits the parameters of a model to the given data by minimizing the
negative log-likelihood using the `iminuit` package. The fit is based on
Extended Unbinned Maximum Likelihood (EUMLE).
Parameters
----------
initial_params : list of float
Initial guesses for the model parameters in the order:
[mu, sigma, beta, m, lamb, mu_b, sigma_b, f, N_expected].
samples : np.ndarray, optional
Observed data points of shape (N, 2), where each row represents a pair of (X, Y) values.
If not provided, the method attempts to use samples stored in the instance (`self.samples`).
print_results : bool, optional
If True, prints the `iminuit` results to the console. Default is False.
save_to_class : bool, optional
If True, saves the resulting `Minuit` object (`self.mi`) to the instance for later use.
Default is False.
Returns
-------
tuple
A tuple containing:
- `mi.values` : A dictionary of fitted parameter values.
- `mi.errors` : A dictionary of parameter uncertainties.
Raises
------
ValueError
If no data samples are provided or stored in the instance (`self.samples`).
RuntimeError
If the minimization process fails to converge (`migrad` is invalid).
Notes
-----
- Parameter limits are set based on physical significance and distribution constraints:
- The fit includes an error analysis using the `Hesse` algorithm to estimate parameter uncertainties.
- The `iminuit` object provides detailed information about the fit, including parameter correlations.
"""
# Allow for the samples to be passed in, if not try use the samples already generated and stored` class
if samples is None:
samples = self.samples
if samples is None:
raise ValueError("No samples have been provided or generated by the `accept_reject_sample` method first")
# Split the samples into X and Y so correct passing in
samples_x = samples[:, 0]
samples_y = samples[:, 1]
# Define the density function - required by iminuit Extended Unbinned Likelihood
def density(samples_in, mu, sigma, beta, m, lamb, mu_b, sigma_b, f, N):
samples_x, samples_y = samples_in
pdf_vals = self.pdf_fitting(samples_x, samples_y, mu, sigma, beta, m, lamb, mu_b, sigma_b, f)
return N, N * pdf_vals
# Create the negative log-likelihood function
neg_log_likelihood = ExtendedUnbinnedNLL((samples_x, samples_y), density)
# Create the Minuit object with initial guesses
mi = Minuit(neg_log_likelihood, mu=initial_params[0],sigma=initial_params[1], beta=initial_params[2],
m=initial_params[3], lamb=initial_params[4], mu_b=initial_params[5], sigma_b=initial_params[6],
f=initial_params[7], N=initial_params[8])
# Set parameter limits based on each paramaters restrictions and physical significance
# sigma > 0
mi.limits["sigma"] = (1e-3, None)
# beta > 0
mi.limits["beta"] = (1e-2, None)
# m > 1
mi.limits["m"] = (1.01, None)
# lambda > 0
mi.limits["lamb"] = (1e-3, None)
# sigma_b > 0
mi.limits["sigma_b"] = (1e-3, None)
# f in [0, 1]
mi.limits["f"] = (0, 1)
# N_expected > 0
mi.limits["N"] = (1e-3, None)
# Run the minimisation
mi.migrad()
if not mi.valid:
raise RuntimeError("Minimisation did not converge")
# Run the error analysis
mi.hesse()
if save_to_class:
# Save to class
self.mi = mi
if print_results:
print(mi)
# Return the Minuit Values and Errors
return mi.values, mi.errors
[docs]
def fit_params_results(self):
"""
Visualise the results of the parameter fitting process.
Returns
-------
1. Parameter Summary Table
2. Correlation Heatmap
Raises
------
ValueError
If the `Minuit` object (`self.mi`) is not available. Ensure that the `fit_params` method is run before calling this method.
Notes
-----
- True values must be stored in `self.true_params` for comparison in the table.
- Fitted values and uncertainties are extracted from the `Minuit` object (`self.mi`).
- The correlation matrix is calculated from the `Minuit` covariance matrix.
"""
param_labels = [r"$\mu$",r"$\sigma$",r"$\beta$",r"$m$",r"$\lambda$",r"$\mu_b$",r"$\sigma_b$",r"$f$",r"$N$"]
# check data has actually been fitted to and results are available
if not hasattr(self, "mi"):
raise ValueError("Minuit object not available. Please run fit_params first.")
# Define parameter parent distributions for the table
# stored in the list format (parent distribution(group), name, fitted value, error, true value)=
params_table = [
# Crystal Ball parameters
("Crystal Ball (Signal)", "mu", self.mi.values["mu"], self.mi.errors["mu"], self.true_params[0]),
("Crystal Ball (Signal)", "sigma", self.mi.values["sigma"], self.mi.errors["sigma"], self.true_params[1]),
("Crystal Ball (Signal)", "beta", self.mi.values["beta"], self.mi.errors["beta"], self.true_params[2]),
("Crystal Ball (Signal)", "m", self.mi.values["m"], self.mi.errors["m"], self.true_params[3]),
# Exponential Decay parameter
("Exponential (Signal)", "lamb", self.mi.values["lamb"], self.mi.errors["lamb"], self.true_params[4]),
# Normal parameters
("Normal (Background)", "mu_b", self.mi.values["mu_b"], self.mi.errors["mu_b"], self.true_params[5]),
("Normal (Background)", "sigma_b", self.mi.values["sigma_b"], self.mi.errors["sigma_b"], self.true_params[6]),
# Overall parameters
("Overall", "f", self.mi.values["f"], self.mi.errors["f"], self.true_params[7]),
("Overall", "N", self.mi.values["N"], self.mi.errors["N"], None),
]
# Create a dictionary to store the fitting results within the class simultaneously
self.fit_results = {}
# Add columns for "Value ± Error" and "Number of Standard Errors Away"
formatted_table = []
for group, name, value, error, true_val in params_table:
# Save the results to the overall class dictionary - one at a time
self.fit_results[name] = {"value": value, "error": error, "distribution": group}
# String of the value with error
value_with_error = f"{value:.4f} ± {error:.4f}"
# Calculate the number of standard errors away from the true value
if true_val is not None:
num_std_errs = abs(value - true_val) / error
else:
num_std_errs = "N/A"
# Append the formatted row to the table
formatted_table.append([group, name, value_with_error, true_val, num_std_errs])
# Set the column headers
headers = ["Distribution", "Parameter", "Value ± Error", "True Value", "Std Errors Away"]
print(tabulate(formatted_table, headers=headers, floatfmt=".4f", tablefmt="pretty"))
# Get the correlation matrix and parameter names using mi from minuit object
correlation_matrix = self.mi.covariance.correlation()
parameters = self.mi.parameters
fig, ax = plt.subplots(figsize=(8, 8))
# Plot the heatmap
cax = ax.matshow(correlation_matrix, cmap='coolwarm', vmin=-1, vmax=1)
# Add a colorbar
cbar = plt.colorbar(cax, fraction=0.046, pad=0.04)
cbar.ax.tick_params(labelsize=14)
ax.set_xticks(range(len(parameters)))
ax.set_yticks(range(len(parameters)))
ax.set_xticklabels(param_labels, fontsize=14, rotation=45)
ax.set_yticklabels(param_labels, fontsize=14)
# Put the values in the centre of the cells for the heat map
for (i, j), val in np.ndenumerate(correlation_matrix):
ax.text(j, i, f'{val:.2f}', ha='center', va='center', color='black', fontsize=12)
plt.tight_layout()
plt.show()
[docs]
def plot_profiled_likelihoods(self):
"""
Plot the profiled log-likelihoods for all parameters in a 3x3 grid with improved formatting.
Returns
-------
A 3x3 grid of plots, where each subplot corresponds to a parameter's profiled log-likelihood (`-ln L`) as a function of its value.
The plots highlight the 1σ and 2σ confidence intervals for each parameter.
Raises
------
ValueError
If the `Minuit` object (`self.mi`) is not available. Ensure that the `fit_params` method has been run successfully before calling this method.
"""
# Check if the class has a Minuit object stored
if not hasattr(self, "mi"):
raise ValueError("Fit has not been performed. Run fit_params() first.")
# LaTeX labels for parameters - for use on plot axis
param_labels = {
"mu": r"$\mu$",
"sigma": r"$\sigma$",
"beta": r"$\beta$",
"m": r"$m$",
"lamb": r"$\lambda$",
"mu_b": r"$\mu_b$",
"sigma_b": r"$\sigma_b$",
"f": r"$f$",
"N": r"$N$"
}
# Plot standard preferences
plot_config = {
"xlabel_fontsize": 14,
"ylabel_fontsize": 14,
"title_fontsize": 16,
"tick_fontsize": 12,
"text_fontsize": 12,
"legend_fontsize": 12,
"line_width": 2,
"grid": True,
}
# Recall parameter names
params = self.mi.parameters
# Create a 3x3 grid for plotting
fig, axes = plt.subplots(3, 3, figsize=(15, 15))
axes = axes.flatten()
# For all parameters a seperate subplot is created
for i, param in enumerate(params):
ax = axes[i]
print(f'Plotting profile for {param}')
# Get the likelihood profile data using iminuit's `mnprofile`
# scan_values: where the nll is evaluated
# nll_values: the nll values
# success_flags: whether the minimization was successful
scan_values, nll_values, success_flags = self.mi.mnprofile(param, bound=2, size=50)
# Only inlclude points in which the minimization was successful
scan_values = scan_values[success_flags]
nll_values = nll_values[success_flags]
# Shift nll values so is it standardised with minimum value at 0
nll_values = nll_values - np.min(nll_values)
# Find the 1 sigma and 2 sigma confidence intervals
# The best fit/ minimium point
min_value = scan_values[np.argmin(nll_values)]
# where the nll values are less than 0.5 and 2.0
lhs_1sigma = scan_values[np.where(nll_values <= 0.5)[0][0]]
rhs_1sigma = scan_values[np.where(nll_values <= 0.5)[0][-1]]
lhs_2sigma = scan_values[np.where(nll_values <= 2.0)[0][0]]
rhs_2sigma = scan_values[np.where(nll_values <= 2.0)[0][-1]]
# Calculate the difference from the minimium value - ie where it will be on graph
lhs_1sigma_diff = lhs_1sigma - min_value
rhs_1sigma_diff = rhs_1sigma - min_value
lhs_2sigma_diff = lhs_2sigma - min_value
rhs_2sigma_diff = rhs_2sigma - min_value
# Plot the likelihood profile
ax.plot(scan_values, nll_values, color="blue", linewidth=plot_config["line_width"], label=r"$-\ln\mathcal{L}$")
# Shade the 1 sigma interval under the curve in green
ax.fill_between(
scan_values,
0, nll_values,
where=((scan_values >= lhs_1sigma) & (scan_values <= rhs_1sigma)),
color="green",
alpha=0.3,
label=r"$1\sigma$ interval"
)
# Shade the 2 sigma interval under the curve in red
ax.fill_between(
scan_values,
0, nll_values,
where=((scan_values >= lhs_2sigma) & (scan_values <= lhs_1sigma)) |
((scan_values >= rhs_1sigma) & (scan_values <= rhs_2sigma)),
color="red",
alpha=0.3,
label=r"$2\sigma$ interval"
)
# Add horizontal lines for delta log like = 0.5 and 2 (ie 1 sigma and 2 sigma)
ax.axhline(0.5, color="black", linestyle="--", linewidth=1)
ax.axhline(2.0, color="black", linestyle="--", linewidth=1)
# Put text on the horizontal lines
ax.text(
scan_values[0], 0.5 + 0.1, r"$\Delta\ln\mathcal{L} = 0.5$",
fontsize=plot_config["text_fontsize"], color="black", verticalalignment="bottom")
ax.text(
scan_values[0], 2.0 + 0.1, r"$\Delta\ln\mathcal{L} = 2.0$",
fontsize=plot_config["text_fontsize"], color="black", verticalalignment="bottom")
# Add vertical dashed lines for 1 sigma and 2 sigma intervals
ax.axvline(lhs_1sigma, color="gray", linestyle="--")
ax.axvline(rhs_1sigma, color="gray", linestyle="--")
ax.axvline(lhs_2sigma, color="gray", linestyle="--", alpha=0.7)
ax.axvline(rhs_2sigma, color="gray", linestyle="--", alpha=0.7)
# Display the left and right confidence intervals for both 1 sigma and 2 sigma in the top right
ax.text(
0.98, 0.98,
f"$1\sigma$: [{lhs_1sigma_diff:.3f}, {rhs_1sigma_diff:.3f}]\n"
f"$2\sigma$: [{lhs_2sigma_diff:.3f}, {rhs_2sigma_diff:.3f}]",
transform=ax.transAxes,
fontsize=plot_config["text_fontsize"],
verticalalignment="top",
horizontalalignment="right",
bbox=dict(boxstyle="round", facecolor="white", edgecolor="gray")
)
# set title and labels
ax.set_xlabel(param_labels.get(param, param), fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel(r"$-\ln\mathcal{L}$", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis="both", labelsize=plot_config["tick_fontsize"])
# Add legend
handles, labels = ax.get_legend_handles_labels()
filtered_handles_labels = [(h, l) for h, l in zip(handles, labels) if l in [r"$-2\ln\mathcal{L}$", r"$1\sigma$ interval", r"$2\sigma$ interval"]]
handles, labels = zip(*filtered_handles_labels)
ax.legend(handles, labels, fontsize=plot_config["legend_fontsize"], loc="upper left")
# Add grid look
if plot_config["grid"]:
ax.grid(True)
plt.tight_layout()
plt.show()
[docs]
def param_bootstrap_samples(self, num_samples, sample_sizes, output_directory="Bootstrap/Samples", poisson=False):
"""
Generate bootstrap samples with different sizes using the accept-reject method and save them as single files.
Starts fresh by deleting the existing directory.
Parameters
----------
num_samples : int
The number of bootstrap samples to generate for each sample size.
sample_sizes : list of int
The base sizes of each bootstrap sample. Each size will have a separate output file.
output_directory : str, optional
Directory to save the generated samples as .npy files (default: "Bootstrap/Samples").
poisson : bool, optional
Whether to apply Poisson variation to the sample sizes (default: False).
Notes
-----
- Each file will be named "Samples_No_<num_samples>_Size_<sample_size>.npy".
- Each file contains a list of arrays, where each array corresponds to a sample with its actual size.
"""
# Ensure the output directory exists and is empty
if os.path.exists(output_directory):
shutil.rmtree(output_directory)
os.makedirs(output_directory)
# Loop over each sample size, storing each set in a separate file
for size in sample_sizes:
print(f"Generating {num_samples} samples with base size {size}...")
# List to store samples - must be list to must store variable sizes due to Poisson
all_samples = []
# Generate the desired number of samples
for i in range(num_samples):
# Use Poisson distribution if poisson = True otherwise use the base size
if poisson:
actual_size = np.random.poisson(size)
else:
actual_size = size
# Generate a sample with the determined size
sample = self.accept_reject_sample(desired_samples=actual_size)
# Add to overall list
all_samples.append(sample)
# Save list of samples as list of arrays
file_name = f"Samples_No_{num_samples}_BaseSize_{size}.npy"
file_path = os.path.join(output_directory, file_name)
np.save(file_path, np.array(all_samples, dtype=object), allow_pickle=True)
print(f"Bootstrap samples with base size {size} saved to {file_path}")
[docs]
def param_bootstrap_fit(self,initial_params , input_directory="Bootstrap/Samples", output_directory="Bootstrap/Param_Results"):
"""
Perform parameter fitting using `fit_params` method on bootstrap samples and save the results.
Parameters
----------
initial_params : list
Initial guesses for the parameters [mu, sigma, beta, m, lamb, mu_b, sigma_b, f].
input_directory : str, optional
Path to the directory containing bootstrap sample files (default: "Bootstrap/Samples").
Each file should follow the naming convention "Samples_No_<num_samples>_BaseSize_<sample_size>.npy".
output_directory : str, optional
Path to the directory where the fitting results will be saved (default: "Bootstrap/Param_Results").
Each result file will be named "ParamResults_No_<num_samples>_BaseSize_<sample_size>.npy".
Returns
-------
For each sample file:
- A `.npy` file containing an array of shape (2, num_samples, num_params):
- First dimension (`values`): Fitted parameter values for each sample.
- Second dimension (`errors`): Corresponding errors for each parameter.
- Prints the number of non-converging samples and the output file path.
Raises
------
ValueError
If the `fit_params` method encounters invalid inputs or fitting fails for unexpected reasons.
"""
# Ensure the output directory exists and is empty
if os.path.exists(output_directory):
shutil.rmtree(output_directory)
os.makedirs(output_directory)
# Repeat for all files in the input directory
for file_name in os.listdir(input_directory):
# Ensure it is a numpy file - ie the samples
if file_name.endswith(".npy"):
# Extract the sample size and number samples from the file name
match = re.search(r"Samples_No_(\d+)_BaseSize_(\d+).npy", file_name)
if not match:
continue
num_samples = int(match.group(1))
sample_size = int(match.group(2))
print(f"Processing bootstrap samples of size {sample_size}...")
# Load the bootstrap samples data from the file
file_path = os.path.join(input_directory, file_name)
samples = np.load(file_path, allow_pickle=True)
# Add the Base sample size to the initial parameters as the expected number of events
initial_params_samples = initial_params.copy()
initial_params_samples.append(sample_size)
num_params = len(initial_params_samples)
# Pre define arrays to store the results for faster computation
values = np.full((num_samples, num_params), np.nan)
errors = np.full((num_samples, num_params), np.nan)
non_converged_count = 0
# Repeat parameter fitting for each sample in the file
for i, sample in enumerate(samples):
try:
# Perform the parameter fitting using `fit_params`
fit_results, fit_errors = self.fit_params(initial_params_samples, sample)
values[i] = fit_results
errors[i] = fit_errors
except Exception as e:
# If fitting fails, log the failure and continue
print(f"Sample {i + 1} (size {sample_size}) did not converge")
non_converged_count += 1
# Combine results into a single array
results = np.array([values, errors])
# Save the results to the file
output_file_name = f"ParamResults_No_{num_samples}_BaseSize_{sample_size}.npy"
output_file_path = os.path.join(output_directory, output_file_name)
np.save(output_file_path, results, allow_pickle=True)
print(f"In total {non_converged_count} samples did not converge out of {num_samples}.")
print(f"Results saved to {output_file_path}\n ")
[docs]
def param_bootstrap_analysis(self, input_directory="Bootstrap/Param_Results", output_directory="Bootstrap/Plots"):
"""
Analyse and visualise parameter fitting results from bootstrap sample, storing plots in the output directory.
Parameters
----------
input_directory : str, optional
Path to the directory containing bootstrap fitting result files (default: "Bootstrap/Param_Results").
Each file is expected to follow the naming convention:
"ParamResults_No_<num_samples>_BaseSize_<sample_size>.npy".
output_directory : str, optional
Path to the directory where plots will be saved (default: "Bootstrap/Plots").
Subdirectories for histograms, trends, and pull plots are created or cleared before use.
Returns
-------
This method generates the following plots:
1. Histograms for each parameter (value, error, and pull) across bootstrap samples.
2. Bias and Error trends vs. sample size for each parameter.
4. Pull distributions for each parameter across all samples.
results : dict
A dictionary storing computed metrics for each sample size. Keys are sample sizes, and values are dictionaries with:
- "Values_Mean", "Values_Std", "Values_Bias", "Errors_Mean", "Errors_Std", "Pull_Mean", "Pull_Std", "Pull_Mean_Error", "Pull_Std_Error".
Raises
------
FileNotFoundError
If the input directory does not exist or contains no valid files.
"""
# LaTeX labels for parameters - for use on plot axis
param_labels = {
"mu": r"$\mu$",
"sigma": r"$\sigma$",
"beta": r"$\beta$",
"m": r"$m$",
"lamb": r"$\lambda$",
"mu_b": r"$\mu_b$",
"sigma_b": r"$\sigma_b$",
"f": r"$f$",
"N": r"$N$"
}
# Standard preferences for plots
plot_config = {
"xlabel_fontsize": 16,
"ylabel_fontsize": 16,
"title_fontsize": 18,
"tick_fontsize": 14,
"suptitle_fontsize": 18,
"legend_fontsize": 14,
"text_fontsize": 14,
"line_width": 2,
"bar_alpha": 0.6, # Add this key for bar transparency
"region_alpha": 0.3, # Transparency for shaded regions
"text_boxstyle": dict(boxstyle="round", facecolor="white", edgecolor="gray"), # Box for μ, σ text
}
# Clear and recreate the output directories for plots
for data_label in ["Value", "Error", "Pull"]:
sub_dir = f"{output_directory}/{data_label}_Histograms"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
sub_dir = f"{output_directory}/Trends_with_Samples_Size"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
sub_dir = f"{output_directory}/Pull_Plots"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
# Initialise the results dictionary
results = {}
# Loop over each file in the input directory - storing the results of paraemter fitting
for filename in os.listdir(input_directory):
if filename.endswith(".npy"):
match = re.search(r"ParamResults_No_(\d+)_BaseSize_(\d+).npy", filename)
if match:
sample_size = int(match.group(2))
print(f"Processing sample size: {sample_size}")
# Load the data from the file
filepath = os.path.join(input_directory, filename)
data = np.load(filepath)
Values = data[0]
Errors = data[1]
# Calculate the true values for the sample size - using class stored and the samples size from file name
Truth = self.true_params.copy()
Truth.append(sample_size)
Truth = np.array(Truth)
Pull = (Values - Truth) / Errors
# Calculate the mean and standard deviation of the values, errors, and pulls
calc_values = {
"Values_Mean": np.nanmean(Values, axis=0),
"Values_Std": np.nanstd(Values, axis=0),
"Values_Bias": np.nanmean(Values, axis=0) - Truth,
"Errors_Mean": np.nanmean(Errors, axis=0),
"Errors_Std": np.nanstd(Errors, axis=0),
"Pull_Mean": np.nanmean(Pull, axis=0),
"Pull_Std": np.nanstd(Pull, axis=0),
"Pull_Mean_Error": np.nanstd(Pull, axis=0) / np.sqrt(Pull.shape[0]), # SEM of the pull mean
"Pull_Std_Error": np.nanstd(Pull, axis=0) / np.sqrt(2 * Pull.shape[0]), # Error on pull std
}
# Add the calculated results to the results dictionary under key of sample size
results[sample_size] = calc_values
# Dictionary to map labels to data arrays for plotting efficiency
data_types = {
"Value": Values,
"Error": Errors,
"Pull": Pull,}
# Create Pull Distribution Plots
# Loop over data types: Values, Errors, and Pulls
for data_label, data_array in data_types.items():
# Each are saves to a separate subdirectory
sub_dir = f"{output_directory}/{data_label}_Histograms"
num_params = data_array.shape[1]
# Create a 3x3 grid for plotting histograms
fig, axes = plt.subplots(3, 3, figsize=(15, 15))
axes = axes.flatten()
# Loop over each parameter and plot its histogram in each subplot in the grid
for i in range(num_params):
param_values = data_array[:, i]
param_values = param_values[~np.isnan(param_values)]
# Remove the last 2% and top 2% of values
lower_limit = np.percentile(param_values, 2)
upper_limit = np.percentile(param_values, 98)
param_values = param_values[(param_values >= lower_limit) & (param_values <= upper_limit)]
# Find the histogram characterists and the error on each bar
bins = np.histogram_bin_edges(param_values, bins=15)
hist, bin_edges = np.histogram(param_values, bins=bins)
bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:])
bin_widths = bin_edges[1:] - bin_edges[:-1]
hist_errors = np.sqrt(hist)
# Plot the histogram
ax = axes[i]
ax.bar(bin_centers, hist, width=bin_widths, alpha=0.6, label='Histogram', color='blue', edgecolor='black')
ax.errorbar(bin_centers, hist, yerr=hist_errors, fmt='k.', capsize=3, label='Error Bars')
# Calculate mean and std for gaussian
mu = np.nanmean(param_values)
std = np.nanstd(param_values)
# Compute Gaussian PDF
x = np.linspace(mu - 3 * std, mu + 3 * std, 100)
pdf = norm.pdf(x, mu, std)
# Scale Gaussian PDF to match the histogram
pdf_scaled = pdf * np.sum(hist) * bin_widths[0]
ax.plot(x, pdf_scaled, 'r-', label='Gaussian Fit')
# Add labels using param_labels dictioanry
param_name = list(param_labels.keys())[i]
param_label = param_labels.get(param_name, f"Param {i + 1}")
# Display mu and sigma of plot in the top-left corner
ax.text(
0.02, 0.98, f"{data_label}: {param_label}\n {mu:.2f}$\pm${std:.2f}",
transform=ax.transAxes, fontsize=plot_config["text_fontsize"],
verticalalignment='top', horizontalalignment='left',
bbox=plot_config["text_boxstyle"]
)
# Set the plot labels and legend
ax.set_xlabel(f"{data_label} : {param_label}", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel("Counts", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.legend(loc='upper right', fontsize=plot_config["legend_fontsize"])
# Set the overall title of grid
fig.suptitle(
f"Histograms of Parameter {data_label} - Sample Size {sample_size}",
fontsize=plot_config["suptitle_fontsize"])
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save the plot
save_path = f"{sub_dir}/{data_label}_histograms_{sample_size}.png"
plt.savefig(save_path)
plt.close(fig)
print(f"Histogram of {data_label} saved in {sub_dir}")
# Define a function for the 1/n-shaped curve
def bias_curve(n, a):
return a / n
# Create Bias and Error against sample size summary plots
# Sort the sample sizes so the plots are in order
sample_sizes = sorted(results.keys())
num_params = len(param_labels)
# Plot Bias vs. Sample Size
fig, axes = plt.subplots(3, 3, figsize=(13, 9))
axes = axes.flatten()
# Plot for all parameters except N as this is the expected number of events and not comparable
for i in range(num_params - 1):
# Determine Absolute Bias for each parameter
biases = [abs(results[sample_size]["Values_Bias"][i]) for sample_size in sample_sizes]
standard_errors = [abs(results[sample_size]["Values_Std"][i]) for sample_size in sample_sizes]/np.sqrt(250)
# Fit the 1/n curve to the biases
popt, _ = curve_fit(bias_curve, sample_sizes, biases)
a_fit = popt[0] # Extract the fitted parameter
# Generate a smooth line for the fitted curve
sample_sizes_smooth = np.linspace(min(sample_sizes), max(sample_sizes), 500)
fitted_biases = bias_curve(sample_sizes_smooth, a_fit)
# Plot the bias and the fitted curve
ax = axes[i]
ax.errorbar(sample_sizes, biases, yerr=standard_errors, fmt='-s', color='red', label='Bias ± Std. Error', capsize=4)
ax.plot(sample_sizes_smooth, fitted_biases, linestyle='--', color='black', label=r'$\frac{a}{n_{\mathrm{samples}}}$')
ax.set_xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel(f"Abs(Bias) in {list(param_labels.values())[i]}", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.grid(True)
ax.legend(fontsize=plot_config["legend_fontsize"])
# Remove unused subplots, make bottom-right blank
for j in range(num_params, 9):
if j == 8: # Bottom-right subplot
axes[j].set_axis_off() # Turn off the axis to leave it blank
else:
fig.delaxes(axes[j]) # Remove unused axes
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Bias_vs_Sample_Size_With_Fitted.png")
plt.show()
plt.close(fig)
print(f"Bias vs Sample Size plot with fitted curves saved in {output_directory}/Trends_with_Samples_Size")
# Sample sizes and biases
biases = [abs(results[sample_size]["Values_Bias"][4]) for sample_size in sample_sizes]
standard_errors = [abs(results[sample_size]["Values_Std"][4]) for sample_size in sample_sizes]/np.sqrt(250)
# Fit the 1/n curve to the data
popt, pcov = curve_fit(bias_curve, sample_sizes, biases)
a_fit = popt[0] # Extract the fitted parameter
# Generate a smooth line for the fitted curve
sample_sizes_smooth = np.linspace(min(sample_sizes), max(sample_sizes), 500)
fitted_biases = bias_curve(sample_sizes_smooth, a_fit)
# Plot Bias vs. Sample Size for just Lambda
plt.figure(figsize=(6, 4))
plt.errorbar(sample_sizes, biases, yerr=standard_errors, fmt='-s', color='red', label='Bias ± Std. Error', capsize=4)
plt.plot(sample_sizes_smooth, fitted_biases, linestyle='--', color='black', label=r'$\frac{a}{n_{\mathrm{samples}}}$')
plt.xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
plt.ylabel(f"Abs(Bias) in {list(param_labels.values())[4]}", fontsize=plot_config["ylabel_fontsize"])
plt.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
plt.grid(True)
plt.legend(fontsize=plot_config["legend_fontsize"])
plt.tight_layout()
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Bias_vs_Sample_Size_Lambda_Fitted.png")
plt.show()
plt.close()
print(f"Bias vs Sample Size plot with fitted curve saved in {output_directory}/Trends_with_Samples_Size_Lambda_Fitted")
# Plot Values_Std vs. Sample Size
fig, axes = plt.subplots(3, 3, figsize=(13, 9))
axes = axes.flatten()
# Plot for all parameters except N as this is the expected number of events and not comparable
for i in range(num_params - 1):
# Determine the uncertainty for each parameter value
values_std = [results[sample_size]["Values_Std"][i] for sample_size in sample_sizes]
errors_mean = [results[sample_size]["Errors_Mean"][i] for sample_size in sample_sizes]
errors_std = [results[sample_size]["Errors_Std"][i] for sample_size in sample_sizes]
ax = axes[i]
ax.plot(sample_sizes, values_std, marker='s', linestyle='-', color='black', label=r'Fitted Values $\sigma$')
ax.errorbar(sample_sizes, errors_mean, yerr=errors_std, fmt='s', linestyle='-', color='red', label=r'Hessian Errors $\mu \pm \sigma $', capsize=4)
ax.set_xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel(f"Uncertainty in {list(param_labels.values())[i]}", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.grid(True)
ax.legend(fontsize=plot_config["legend_fontsize"])
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Errors_Mean_vs_Sample_Size.png")
plt.show() # Display in Jupyter Notebook
plt.close(fig)
print(f"Error vs Sample Size plot saved in {output_directory}/Trends_with_Samples_Size")
# Plot Values_Std vs. Sample Size
plt.figure(figsize=(6,4))
values_std = [results[sample_size]["Values_Std"][4] for sample_size in sample_sizes]
errors_mean = [results[sample_size]["Errors_Mean"][4] for sample_size in sample_sizes]
errors_std = [results[sample_size]["Errors_Std"][4] for sample_size in sample_sizes]
plt.plot(sample_sizes, values_std, marker='s', linestyle='-', color='black', label=r'Fitted Values $\sigma$')
plt.errorbar(sample_sizes, errors_mean, yerr=errors_std, fmt='s', linestyle='-', color='red', label=r'Hessian Errors $\mu \pm \sigma $', capsize=4)
plt.xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
plt.ylabel(f"Uncertainty in {list(param_labels.values())[4]}", fontsize=plot_config["ylabel_fontsize"])
plt.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
plt.grid(True)
plt.legend(fontsize=plot_config["legend_fontsize"])
plt.tight_layout()
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Errors_Mean_vs_Sample_Size_Lambda.png")
plt.show() # Display in Jupyter Notebook
plt.close(fig)
print(f"Error vs Sample Size for Lambda only plot saved in {output_directory}/Trends_with_Samples_Size_Lambda.")
# Plot the Pull Distribution for each parameter - with a seperate plot for each sample size
for sample_size, calc_values in results.items():
num_params = len(param_labels)
fig, ax = plt.subplots(figsize=(10, 6))
# Add light grey shading for Pull between 1 and 2
ax.axvspan(-2, -1, color="lightgrey", alpha=0.4)
ax.axvspan(1, 2, color="lightgrey", alpha=0.4)
# Loop over parameters and plot horizontal bars
for i, (param, label) in enumerate(param_labels.items()):
pull_mean = calc_values["Pull_Mean"][i]
pull_std = calc_values["Pull_Std"][i]
pull_mean_error = calc_values["Pull_Mean_Error"][i]
pull_std_error = calc_values["Pull_Std_Error"][i]
# Light blue bar: between the centers of the two red bars
ax.barh(
y=i,
width=2 * pull_std,
left=pull_mean - pull_std,
height=0.4,
color="lightblue",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Dark blue region: centered at Pull Mean, between Pull Mean +/- Pull Mean Error
ax.barh(
y=i,
width=pull_mean_error * 2,
left=pull_mean - pull_mean_error,
height=0.4,
color="darkblue",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Red regions: centered at Pull Mean ± Pull Std, with widths defined by Pull Std Error
ax.barh(
y=i,
width=pull_std_error * 2,
left=pull_mean - pull_std - pull_std_error,
height=0.4,
color="red",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
ax.barh(
y=i,
width=pull_std_error * 2,
left=pull_mean + pull_std - pull_std_error,
height=0.4,
color="red",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Add labels and grid
ax.set_yticks(range(num_params))
ax.set_yticklabels(list(param_labels.values()), fontsize=plot_config["ylabel_fontsize"])
ax.set_xlabel("Pull", fontsize=plot_config["xlabel_fontsize"])
ax.set_title(f"Pull Distributions for Sample Size {sample_size}", fontsize=plot_config["title_fontsize"])
ax.tick_params(axis="both", which="major", labelsize=plot_config["tick_fontsize"])
ax.axvline(0, color="black", linestyle="--", linewidth=1)
ax.axvline(-2, color="brown", linestyle="dashdot", linewidth=1)
ax.axvline(2, color="brown", linestyle="dashdot", linewidth=1)
ax.grid(True, axis="x")
# Save the plot
save_path = os.path.join(output_directory, f"Pull_Plots/Pull_Distributions_{sample_size}.png")
plt.tight_layout()
plt.savefig(save_path)
plt.close(fig)
print(f"Saved pull distribution plot for sample size {sample_size} in {save_path}")
# Return the results dictionary
return results
[docs]
def fit_params_sWeights(self, initial_params, samples = None, print_results = False, norm_check = True):
"""
Fit parameters using the sWeights method.
This method performs the following steps:
1. Fits the X dimension using an extended unbinned likelihood method.
2. Calculates signal and background weights using the sWeights method.
3. Projects the signal distribution to the Y dimension.
4. Fits the Y dimension using a binned maximum likelihood estimation.
Parameters
----------
initial_params : list or array-like
Initial guesses for the parameters [mu, sigma, beta, m, f, lamb, N].
samples : np.ndarray, optional
The samples to be used for fitting. If not provided, the samples generated by the `accept_reject_sample` method will be used.
print_results : bool, optional
If True, prints the fitting results and plots the intermediate steps. Default is False.
norm_check : bool, optional
If True, performs normalization checks during the sWeights calculation. Default is True.
Returns
-------
mi_total_values : dict
A dictionary containing the fitted parameter values.
mi_total_errors : dict
A dictionary containing the errors of the fitted parameters.
Raises
------
ValueError
If no samples are provided or generated by the `accept_reject_sample` method.
RuntimeError
If the minimization does not converge.
"""
# Allow for the samples to be passed in, if not try use the samples already generated in class
if samples is None:
samples = self.samples
if samples is None:
raise ValueError("No samples have been provided or generated by the `accept_reject_sample` method first")
# Get the indices that would sort the first column
sorted_indices = np.argsort(samples[:, 0])
# Sort the entire array in the X dimension for easy plotting
samples = samples[sorted_indices]
samples_x = samples[:, 0]
samples_y = samples[:, 1]
# Define the density function - required by iminuit Extended Unbinned Likelihood
def density_x(samples_in, mu, sigma, beta, m, f, N):
samples_x = samples_in
pdf_vals = f * self.Signal.X.pdf_fitting(samples_x, mu, sigma, beta, m) + (1-f) * self.Background.X.pdf_fitting(samples_x)
return N, N * pdf_vals
# Create the negative log-likelihood function - to be used only for x dimension
neg_log_likelihood_x = ExtendedUnbinnedNLL(samples_x, density_x)
# Create the Minuit object with initial guesses
mi_x = Minuit(neg_log_likelihood_x, mu=initial_params[0],sigma=initial_params[1], beta=initial_params[2],
m=initial_params[3], f=initial_params[4], N=initial_params[6])
# Set parameter limits based on each paramaters restrictions and physical significance
# sigma > 0
mi_x.limits["sigma"] = (1e-3, None)
# beta > 0
mi_x.limits["beta"] = (1e-2, None)
# m > 1
mi_x.limits["m"] = (1.01, None)
# f in [0, 1]
mi_x.limits["f"] = (0, 1)
# N_expected > 0
mi_x.limits["N"] = (1e-3, None)
# Run the minimisation
mi_x.migrad()
if not mi_x.valid:
raise RuntimeError("Minimisation did not converge")
# Run the error analysis
mi_x.hesse()
if print_results:
print("Step 1 - Iminuit Extended MLE: X Dimension Fitting Results")
print(mi_x)
def signal_x_pdf_fit(x):
return self.Signal.X.pdf_fitting(x, mi_x.values["mu"], mi_x.values["sigma"], mi_x.values["beta"], mi_x.values["m"])
def background_x_pdf_fit(x):
return self.Background.X.pdf_fitting(x)
# Calculate the signal and background weights using the SWeight method
signal_count = mi_x.values["N"] * mi_x.values["f"]
background_count = mi_x.values["N"] * (1 - mi_x.values["f"])
xrange = (self.lower_bound_X, self.upper_bound_X)
sweighter = SWeight( samples_x, pdfs=[signal_x_pdf_fit,background_x_pdf_fit], yields=[signal_count,background_count], discvarranges=((xrange),), checks = norm_check)
signal_weight = sweighter.get_weight(0,samples_x)
background_weight = sweighter.get_weight(1,samples_x)
# Display the results
if print_results:
print("Step 2 - Sweights: Determine Signal and Background Weights from X")
fig, ax = plt.subplots()
ax.plot(samples_x, signal_weight, 'r--', label='Signal Weight')
ax.plot(samples_x, background_weight, 'b--', label='Background Weight')
ax.plot(signal_weight + background_weight, 'k-', label='Total Weight')
ax.legend()
ax.set_xlim(self.lower_bound_X, self.upper_bound_X)
ax.set_xlabel('$X$')
ax.set_ylabel('Weight')
plt.show()
# Project the distribution to the Y dimension by applying the weight
# Effectively giving signal distribution in Y
yrange = (self.lower_bound_Y, self.upper_bound_Y)
ysw, ye = np.histogram( samples_y, bins=50, range=yrange, weights=signal_weight)
ysw2, ye = np.histogram( samples_y, bins=50, range=yrange, weights=signal_weight**2)
cy = 0.5*(ye[1:]+ye[:-1])
if print_results:
print("Step 3 - Sweights: Project to Signal Distribution in Y")
# Plot to show the SWeighted data in Y compared to the true signal distribution
fig, ax = plt.subplots(figsize=(10, 6))
# Plot the weighted histogram with error bars
ax.errorbar(cy, ysw, yerr=ysw2**0.5, fmt='rx', label='All SWeighted Data in Y', markersize=5, capsize=4, capthick=1)
ax.hist(cy, bins=len(cy), weights=ysw, density=False, alpha=0.3, color='red')
# Plot the expected PDF curve
ax.plot(cy, (ye[2] - ye[1]) * mi_x.values["N"] * mi_x.values["f"] * self.Signal.Y.pdf(cy), label='Expected Signal Counts \n From True PDF', color='blue', linewidth=2)
ax.grid(visible=True, linestyle='--', linewidth=0.5)
ax.set_xlabel("Y", fontsize=14)
ax.set_ylabel("Counts", fontsize=14)
ax.legend(fontsize=12)
ax.tick_params(axis='both', which='major', labelsize=12)
ax.tick_params(axis='both', which='minor', labelsize=10)
plt.tight_layout()
plt.show()
# Define the CDF function for the signal in Y dimension for Binned MLE
def cdf_signal_y(x, lamb):
return self.Signal.Y.cdf_fitting(x, lamb)
# Perform Binned MLE for the signal in the Y dimension witht the whole data set
neg_log_likelihood_signal_y_binned = BinnedNLL(ysw, ye, cdf_signal_y)
mi_signal_y = Minuit(neg_log_likelihood_signal_y_binned, lamb=initial_params[5])
# Limit lamb > 0
mi_signal_y.limits["lamb"] = (1e-3, None)
mi_signal_y.migrad()
if not mi_signal_y.valid:
raise RuntimeError("Minimisation in Y did not converge")
# Run the error analysis
mi_signal_y.hesse()
if print_results:
print("Step 4 - Binned MLE: Signal in Y Dimension Fitting Results")
print(mi_signal_y)
# Explicitly construct dictionaries from ValueView objects
mi_x_values_dict = {key: mi_x.values[key] for key in mi_x.parameters}
mi_signal_y_values_dict = {key: mi_signal_y.values[key] for key in mi_signal_y.parameters}
mi_x_errors_dict = {key: mi_x.errors[key] for key in mi_x.parameters}
mi_signal_y_errors_dict = {key: mi_signal_y.errors[key] for key in mi_signal_y.parameters}
mi_total_values = {**mi_x_values_dict, **mi_signal_y_values_dict}
mi_total_errors = {**mi_x_errors_dict, **mi_signal_y_errors_dict}
if print_results:
print("\n Final summary of Results")
# Print table of results
true_values = {
"mu": self.true_params[0],
"sigma": self.true_params[1],
"beta": self.true_params[2],
"m": self.true_params[3],
"f": self.true_params[7],
"N": "N/A",
"lamb": self.true_params[4],
}
table_data = [
[
param,
f"{round(value, 4)} ± {round(mi_total_errors[param], 4)}",
true_values.get(param, "N/A") # Use "N/A" if no true value is provided
]
for param, value in mi_total_values.items()
]
headers = ["Parameter", "Value ± Error", "True Value"]
print(tabulate(table_data, headers=headers, tablefmt="grid"))
return mi_total_values, mi_total_errors
[docs]
def param_bootstrap_sWeights_fit(self,initial_params, norm_check = True, input_directory="Bootstrap/Samples", output_directory="Bootstrap/sWeights/Results"):
"""
Perform parameter fitting using sWeights for signal and background PDFs for all bootstrap samples.
This function fits the parameters of signal and background probability density functions (PDFs)
using extended unbinned maximum likelihood for the X dimension. It then calculates the signal
and background weights using the sWeight method and fits the signal PDF in the Y dimension using
binned maximum likelihood.
Parameters
----------
initial_params : list
Initial guesses for the parameters. The list must contain:
[mu, sigma, beta, m, f, lamb, N].
samples : numpy.ndarray, optional
Array of samples for fitting. If None, it uses samples generated or stored
in the class. Defaults to None.
print_results : bool, optional
If True, prints detailed fitting results and plots. Defaults to False.
norm_check : bool, optional
If True, enables normalization checks in the sWeight method. Defaults to True.
Returns
-------
tuple
Two dictionaries containing the fitted parameter values and their associated errors:
- mi_total_values (dict): Fitted parameter values.
- mi_total_errors (dict): Fitted parameter errors.
Raises
------
ValueError
If no samples are provided or available for fitting.
RuntimeError
If minimization for X or Y dimension does not converge.
"""
# Ensure the output directory exists and is empty
if os.path.exists(output_directory):
shutil.rmtree(output_directory)
os.makedirs(output_directory)
# Repeat for all files in the input directory
for file_name in os.listdir(input_directory):
# Ensure it is a numpy file - ie the samples
if file_name.endswith(".npy"):
# Extract the sample size and number samples from the file name
match = re.search(r"Samples_No_(\d+)_BaseSize_(\d+).npy", file_name)
if not match:
continue
num_samples = int(match.group(1))
sample_size = int(match.group(2))
print(f"Processing bootstrap samples of size {sample_size} using sWeights...")
# Load the bootstrap samples data from the file
file_path = os.path.join(input_directory, file_name)
samples = np.load(file_path, allow_pickle=True)
# Add the Base sample size to the initial parameters as the expected number of events
initial_params_samples = initial_params.copy()
initial_params_samples.append(sample_size)
num_params = len(initial_params_samples)
# Pre define arrays to store the results for faster computation
values = np.full((num_samples, num_params), np.nan)
errors = np.full((num_samples, num_params), np.nan)
non_converged_count = 0
# Repeat parameter fitting for each sample in the file
for i, sample in enumerate(samples):
try:
# Perform the parameter fitting using `fit_params_sWeights`
fit_results, fit_errors = self.fit_params_sWeights(initial_params_samples, sample, norm_check = norm_check)
values[i] = np.array(list(fit_results.values()))
errors[i] = np.array(list(fit_errors.values()))
except Exception as e:
# If fitting fails, log the failure and continue
print(f"Sample {i + 1} (size {sample_size}) did not converge")
print(e)
non_converged_count += 1
# Combine results into a single array
results = np.array([values, errors])
# Save the results to the file
output_file_name = f"ParamResults_No_{num_samples}_BaseSize_{sample_size}.npy"
output_file_path = os.path.join(output_directory, output_file_name)
np.save(output_file_path, results, allow_pickle=True)
print(f"In total {non_converged_count} samples did not converge out of {num_samples}.")
print(f"Results saved to {output_file_path}\n ")
[docs]
def param_bootstrap_sWeights_analysis(self, input_directory="Bootstrap/sWeights/Results", output_directory="Bootstrap/sWeights/Plots"):
"""
Perform a parametric bootstrap analysis using sWeights and generate plots.
This method performs the following steps:
1. Loads the bootstrap results from the specified input directory.
2. Calculates the mean, standard deviation, bias, and pull for each parameter.
3. Generates histograms for the values, errors, and pulls of each parameter.
4. Creates summary plots for bias and error against sample size.
5. Generates pull distribution plots for each parameter and sample size.
Parameters
----------
input_directory : str, optional
The directory containing the bootstrap results (.npy files). Default is "Bootstrap/sWeights/Results".
output_directory : str, optional
The directory where the plots will be saved. Default is "Bootstrap/sWeights/Plots".
Returns
-------
This method generates the following plots:
1. Histograms for each parameter (value, error, and pull) across bootstrap samples.
2. Bias and Error trends vs. sample size for each parameter.
4. Pull distributions for each parameter across all samples.
results : dict
A dictionary storing computed metrics for each sample size. Keys are sample sizes, and values are dictionaries with:
- "Values_Mean", "Values_Std", "Values_Bias", "Errors_Mean", "Errors_Std", "Pull_Mean", "Pull_Std", "Pull_Mean_Error", "Pull_Std_Error".
Raises
------
FileNotFoundError
If the input directory does not exist or contains no valid files.
"""
# LaTeX labels for parameters - for use on plot axis
param_labels = {
"mu": r"$\mu$",
"sigma": r"$\sigma$",
"beta": r"$\beta$",
"m": r"$m$",
"f" : r"$f$",
"N": r"$N$",
"lamb": r"$\lambda$",
}
# Standard preferences for plots
plot_config = {
"xlabel_fontsize": 16,
"ylabel_fontsize": 16,
"title_fontsize": 18,
"tick_fontsize": 14,
"suptitle_fontsize": 18,
"legend_fontsize": 14,
"text_fontsize": 14,
"line_width": 2,
"bar_alpha": 0.6, # Add this key for bar transparency
"region_alpha": 0.3, # Transparency for shaded regions
"text_boxstyle": dict(boxstyle="round", facecolor="white", edgecolor="gray"), # Box for μ, σ text
}
# Clear and recreate the output directories for plots
for data_label in ["Value", "Error", "Pull"]:
sub_dir = f"{output_directory}/{data_label}_Histograms"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
sub_dir = f"{output_directory}/Trends_with_Samples_Size"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
sub_dir = f"{output_directory}/Pull_Plots"
if os.path.exists(sub_dir):
shutil.rmtree(sub_dir)
os.makedirs(sub_dir)
# Initialise the results dictionary
results = {}
# Loop over each file in the input directory - storing the results of paraemter fitting
for filename in os.listdir(input_directory):
if filename.endswith(".npy"):
match = re.search(r"ParamResults_No_(\d+)_BaseSize_(\d+).npy", filename)
if match:
sample_size = int(match.group(2))
print(f"Processing sWeights fitted sample size: {sample_size}")
# Load the data from the file
filepath = os.path.join(input_directory, filename)
data = np.load(filepath)
Values = data[0]
Errors = data[1]
# Calculate the true values for the sample size - using class stored and the samples size from file name
Truth = [self.true_params[i] for i in [0, 1, 2, 3, 7]]
Truth.append(sample_size)
Truth.append(self.true_params[4])
Truth = np.array(Truth)
Pull = (Values - Truth) / Errors
# Calculate the mean and standard deviation of the values, errors, and pulls
calc_values = {
"Values_Mean": np.nanmean(Values, axis=0),
"Values_Std": np.nanstd(Values, axis=0),
"Values_Bias": np.nanmean(Values, axis=0) - Truth,
"Errors_Mean": np.nanmean(Errors, axis=0),
"Errors_Std": np.nanstd(Errors, axis=0),
"Pull_Mean": np.nanmean(Pull, axis=0),
"Pull_Std": np.nanstd(Pull, axis=0),
"Pull_Mean_Error": np.nanstd(Pull, axis=0) / np.sqrt(Pull.shape[0]), # SEM of the pull mean
"Pull_Std_Error": np.nanstd(Pull, axis=0) / np.sqrt(2 * Pull.shape[0]), # Error on pull std
}
# Add the calculated results to the results dictionary under key of sample size
results[sample_size] = calc_values
# Dictionary to map labels to data arrays for plotting efficiency
data_types = {
"Value": Values,
"Error": Errors,
"Pull": Pull,}
# Create Pull Distribution Plots
# Loop over data types: Values, Errors, and Pulls
for data_label, data_array in data_types.items():
# Each are saves to a separate subdirectory
sub_dir = f"{output_directory}/{data_label}_Histograms"
num_params = data_array.shape[1]
# Create a 3x3 grid for plotting histograms
fig, axes = plt.subplots(3, 3, figsize=(15, 15))
axes = axes.flatten()
# Loop over each parameter and plot its histogram in each subplot in the grid
for i in range(num_params):
param_values = data_array[:, i]
param_values = param_values[~np.isnan(param_values)]
# Remove the last 2% and top 2% of values
lower_limit = np.percentile(param_values, 2)
upper_limit = np.percentile(param_values, 98)
param_values = param_values[(param_values >= lower_limit) & (param_values <= upper_limit)]
# Find the histogram characterists and the error on each bar
bins = np.histogram_bin_edges(param_values, bins=15)
hist, bin_edges = np.histogram(param_values, bins=bins)
bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:])
bin_widths = bin_edges[1:] - bin_edges[:-1]
hist_errors = np.sqrt(hist)
# Plot the histogram
ax = axes[i]
ax.bar(bin_centers, hist, width=bin_widths, alpha=0.6, label='Histogram', color='blue', edgecolor='black')
ax.errorbar(bin_centers, hist, yerr=hist_errors, fmt='k.', capsize=3, label='Error Bars')
# Calculate mean and std for gaussian
mu = np.nanmean(param_values)
std = np.nanstd(param_values)
# Compute Gaussian PDF
x = np.linspace(mu - 3 * std, mu + 3 * std, 100)
pdf = norm.pdf(x, mu, std)
# Scale Gaussian PDF to match the histogram
pdf_scaled = pdf * np.sum(hist) * bin_widths[0]
ax.plot(x, pdf_scaled, 'r-', label='Gaussian Fit')
# Add labels using param_labels dictioanry
param_name = list(param_labels.keys())[i]
param_label = param_labels.get(param_name, f"Param {i + 1}")
# Display mu and sigma of plot in the top-left corner
ax.text(
0.02, 0.98, f"{data_label}: {param_label}\n {mu:.2f}$\pm${std:.2f}",
transform=ax.transAxes, fontsize=plot_config["text_fontsize"],
verticalalignment='top', horizontalalignment='left',
bbox=plot_config["text_boxstyle"]
)
# Set the plot labels and legend
ax.set_xlabel(f"{data_label} : {param_label}", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel("Counts", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.legend(loc='upper right', fontsize=plot_config["legend_fontsize"])
# Set the overall title of grid
fig.suptitle(
f"Histograms of Parameter {data_label}- Using sWeights - Sample Size {sample_size}",
fontsize=plot_config["suptitle_fontsize"])
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save the plot
save_path = f"{sub_dir}/{data_label}_histograms_{sample_size}.png"
plt.savefig(save_path)
plt.close(fig)
print(f"Histogram of {data_label} saved in {sub_dir}")
# Define a function for the 1/n-shaped curve
def bias_curve(n, a):
return a / n
# Create Bias and Error against sample size summary plots
# Sort the sample sizes so the plots are in order
sample_sizes = sorted(results.keys())
num_params = len(param_labels)
# Plot Bias vs. Sample Size
fig, axes = plt.subplots(3, 2, figsize=(13, 9))
axes = axes.flatten()
# Plot for all parameters bar N as this is the expected number of events and not comparible
for i in [0,1,2,3,4,6]:
# Determine Absolute Bias for each parameter
biases = [abs(results[sample_size]["Values_Bias"][i]) for sample_size in sample_sizes]
standard_errors = [abs(results[sample_size]["Values_Std"][i]) for sample_size in sample_sizes]/np.sqrt(250)
# Fit the 1/n curve to the biases
popt, _ = curve_fit(bias_curve, sample_sizes, biases)
a_fit = popt[0] # Extract the fitted parameter
# Generate a smooth line for the fitted curve
sample_sizes_smooth = np.linspace(min(sample_sizes), max(sample_sizes), 500)
fitted_biases = bias_curve(sample_sizes_smooth, a_fit)
if i != 6:
ax = axes[i]
else:
ax = axes[5]
ax.errorbar(sample_sizes, biases, yerr=standard_errors, fmt='-s', color='red', label='Bias ± Std. Error', capsize=4)
ax.plot(sample_sizes_smooth, fitted_biases, linestyle='--', color='black', label=r'$\frac{a}{n_{\mathrm{samples}}}$')
ax.set_xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel(f"Abs(Bias) in {list(param_labels.values())[i]}", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.grid(True)
# ax.set_xscale('log')
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Bias_vs_Sample_Size.png")
plt.show()
plt.close(fig)
print(f"Bias vs Sample Size plot saved in {output_directory}/Trends_with_Samples_Size")
# Sample sizes and biases
biases = [abs(results[sample_size]["Values_Bias"][6]) for sample_size in sample_sizes]
standard_errors = [abs(results[sample_size]["Values_Std"][6]) for sample_size in sample_sizes]/np.sqrt(250)
# Fit the 1/n curve to the data
popt, pcov = curve_fit(bias_curve, sample_sizes, biases)
a_fit = popt[0] # Extract the fitted parameter
# Generate a smooth line for the fitted curve
sample_sizes_smooth = np.linspace(min(sample_sizes), max(sample_sizes), 500)
fitted_biases = bias_curve(sample_sizes_smooth, a_fit)
# Plot Bias vs. Sample Size for just Lambda
plt.figure(figsize=(6, 4))
plt.errorbar(sample_sizes, biases, yerr=standard_errors, fmt='-s', color='red', label='Bias ± Std. Error', capsize=4)
plt.plot(sample_sizes_smooth, fitted_biases, linestyle='--', color='black', label=r'$\frac{a}{n_{\mathrm{samples}}}$')
plt.xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
plt.ylabel(f"Abs(Bias) in {list(param_labels.values())[6]}", fontsize=plot_config["ylabel_fontsize"])
plt.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
plt.grid(True)
plt.legend(fontsize=plot_config["legend_fontsize"])
plt.tight_layout()
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Bias_vs_Sample_Size_Lambda_Fitted.png")
plt.show()
plt.close()
print(f"Bias vs Sample Size plot with fitted curve saved in {output_directory}/Trends_with_Samples_Size_Lambda_Fitted")
# Plot Values_Std vs. Sample Size
fig, axes = plt.subplots(3, 2, figsize=(13, 9))
axes = axes.flatten()
# Plot for all parameters bar N as this is the expected number of events and not comparible
for i in [0,1,2,3,4,6]:
# Determine the uncertainty for each parameter value
values_std = [results[sample_size]["Values_Std"][i] for sample_size in sample_sizes]
errors_mean = [results[sample_size]["Errors_Mean"][i] for sample_size in sample_sizes]
errors_std = [results[sample_size]["Errors_Std"][i] for sample_size in sample_sizes]
if i != 6:
ax = axes[i]
else:
ax = axes[5]
ax.plot(sample_sizes, values_std, marker='s', linestyle='-', color='black', label=r'Fitted Values $\sigma$')
ax.errorbar(sample_sizes, errors_mean, yerr=errors_std, fmt='s', linestyle='-', color='red', label=r'Hessian Errors $\mu \pm \sigma $', capsize=4)
ax.set_xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
ax.set_ylabel(f"Uncertainty in {list(param_labels.values())[i]}", fontsize=plot_config["ylabel_fontsize"])
ax.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
ax.grid(True)
ax.legend(fontsize=plot_config["legend_fontsize"])
# ax.set_xscale('log')
plt.tight_layout(rect=[0, 0, 1, 0.96])
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Errors_Mean_vs_Sample_Size.png")
plt.show() # Display in Jupyter Notebook
plt.close(fig)
print(f"Error vs Sample Size plot saved in {output_directory}/Trends_with_Samples_Size")
# Plot Values_Std vs. Sample Size
plt.figure(figsize=(6,4))
values_std = [results[sample_size]["Values_Std"][6] for sample_size in sample_sizes]
errors_mean = [results[sample_size]["Errors_Mean"][6] for sample_size in sample_sizes]
errors_std = [results[sample_size]["Errors_Std"][6] for sample_size in sample_sizes]
plt.plot(sample_sizes, values_std, marker='s', linestyle='-', color='black', label=r'Fitted Values $\sigma$')
plt.errorbar(sample_sizes, errors_mean, yerr=errors_std, fmt='s', linestyle='-', color='red', label=r'Hessian Errors $\mu \pm \sigma $', capsize=4)
plt.xlabel("Sample Size", fontsize=plot_config["xlabel_fontsize"])
plt.ylabel(f"Uncertainty in {list(param_labels.values())[6]}", fontsize=plot_config["ylabel_fontsize"])
plt.tick_params(axis='both', which='major', labelsize=plot_config["tick_fontsize"])
plt.grid(True)
plt.legend(fontsize=plot_config["legend_fontsize"])
plt.tight_layout()
# Save Plot
plt.savefig(f"{output_directory}/Trends_with_Samples_Size/Errors_Mean_vs_Sample_Size_Lambda.png")
plt.show() # Display in Jupyter Notebook
plt.close(fig)
print(f"Error vs Sample Size for Lambda only plot saved in {output_directory}/Trends_with_Samples_Size_Lambda.")
# Plot the Pull Distribution for each parameter - with a seperate plot for each sample size
for sample_size, calc_values in results.items():
num_params = len(param_labels)
fig, ax = plt.subplots(figsize=(10, 6))
# Add light grey shading for Pull between 1 and 2
ax.axvspan(-2, -1, color="lightgrey", alpha=0.4)
ax.axvspan(1, 2, color="lightgrey", alpha=0.4)
# Loop over parameters and plot horizontal bars
for i, (param, label) in enumerate(param_labels.items()):
pull_mean = calc_values["Pull_Mean"][i]
pull_std = calc_values["Pull_Std"][i]
pull_mean_error = calc_values["Pull_Mean_Error"][i]
pull_std_error = calc_values["Pull_Std_Error"][i]
# Light blue bar: between the centers of the two red bars
ax.barh(
y=i,
width=2 * pull_std,
left=pull_mean - pull_std,
height=0.4,
color="lightblue",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Dark blue region: centered at Pull Mean, between Pull Mean +/- Pull Mean Error
ax.barh(
y=i,
width=pull_mean_error * 2,
left=pull_mean - pull_mean_error,
height=0.4,
color="darkblue",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Red regions: centered at Pull Mean ± Pull Std, with widths defined by Pull Std Error
ax.barh(
y=i,
width=pull_std_error * 2,
left=pull_mean - pull_std - pull_std_error,
height=0.4,
color="red",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
ax.barh(
y=i,
width=pull_std_error * 2,
left=pull_mean + pull_std - pull_std_error,
height=0.4,
color="red",
alpha=plot_config["bar_alpha"],
edgecolor="black",
)
# Add labels and grid
ax.set_yticks(range(num_params))
ax.set_yticklabels(list(param_labels.values()), fontsize=plot_config["ylabel_fontsize"])
ax.set_xlabel("Pull", fontsize=plot_config["xlabel_fontsize"])
ax.set_title(f"Pull Distributions for Sample Size {sample_size}", fontsize=plot_config["title_fontsize"])
ax.tick_params(axis="both", which="major", labelsize=plot_config["tick_fontsize"])
ax.axvline(0, color="black", linestyle="--", linewidth=1)
ax.axvline(-2, color="brown", linestyle="dashdot", linewidth=1)
ax.axvline(2, color="brown", linestyle="dashdot", linewidth=1)
ax.grid(True, axis="x")
# Save the plot
save_path = os.path.join(output_directory, f"Pull_Plots/Pull_Distributions_{sample_size}.png")
plt.tight_layout()
plt.savefig(save_path)
plt.close(fig)
print(f"Saved pull distribution plot for sample size {sample_size} in {save_path}")
# Return the results dictionary
return results