/*
* Copyright (c) 2019-2022 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};
use log::{debug, info, warn};
use num_traits::{Float, Zero};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use threadpool::ThreadPool;
use crate::{DVector, Minorant, Real, Saveable};
use super::channels::{
ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterError, MasterProcess, MasterProcessState, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;
/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;
/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error<PErr, MErr> {
#[error("Master problem creation failed.")]
BuildMaster(#[source] MErr),
#[error("Error solving the master problem.")]
Master(#[source] MasterError<PErr, MErr>),
#[error("Iteration limit ({limit}) has been reached.")]
IterationLimit { limit: usize },
#[error("Problem evaluation failed.")]
Evaluation(PErr),
#[error("Problem update failed.")]
Update(PErr),
#[error("Wrong dimension of `{what}`.")]
Dimension { what: String },
#[error("Invalid variable bounds (lower: {lower}, upper: {upper}).")]
InvalidBounds { lower: Real, upper: Real },
#[error("Disconnected receiving channel to a subprocess.")]
DisconnectedReceiver(#[source] RecvError),
#[error("Solver has not been initialized.")]
NotInitialized,
#[error("Problem has not been solved, yet.")]
NotSolved,
}
unsafe impl<PErr: Sync, MErr: Sync> Sync for Error<PErr, MErr> {}
unsafe impl<PErr: Send, MErr: Send> Send for Error<PErr, MErr> {}
/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
T,
Error<
<P as FirstOrderProblem>::Err,
<<M as MasterBuilder<<P as FirstOrderProblem>::Minorant>>::MasterProblem as MasterProblem<
<P as FirstOrderProblem>::Minorant,
>>::Err,
>,
>;
impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr> {
fn from(err: MasterError<PErr, MErr>) -> Error<PErr, MErr> {
Error::Master(err)
}
}
impl<PErr, MErr> From<RecvError> for Error<PErr, MErr> {
fn from(err: RecvError) -> Error<PErr, MErr> {
Error::DisconnectedReceiver(err)
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
pub struct Parameters {
/// The descent step acceptance factors, must be in (0,1).
///
/// The default value is 0.1.
acceptance_factor: Real,
}
impl Default for Parameters {
fn default() -> Self {
Parameters { acceptance_factor: 0.1 }
}
}
impl Parameters {
/// Change the descent step acceptance factor.
///
/// The default value is 0.1.
pub fn set_acceptance_factor(&mut self, acceptance_factor: Real) {
assert!(
acceptance_factor > 0.0 && acceptance_factor < 1.0,
"Descent step acceptance factors must be in (0,1), got: {}",
acceptance_factor
);
self.acceptance_factor = acceptance_factor;
}
}
/// The step type that has been performed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Step {
/// A null step has been performed.
Null,
/// A descent step has been performed.
Descent,
/// No step but the algorithm has been terminated.
Term,
}
#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub struct SolverData {
/// Current center of stability.
cur_y: DVector,
/// Function value in the current point.
cur_val: Real,
/// Function value at the current candidate.
nxt_val: Real,
/// Model value at the current candidate.
nxt_mod: Real,
/// The value of the new minorant in the current center.
new_cutval: Real,
/// The current expected progress.
///
/// This value is actually `cur_val - nxt_val`. We store it separately only
/// for debugging purposes because after a descent step `cur_val` will be
/// changed and we could not see the "old" expected progress anymore that
/// led to the descent step.
expected_progress: Real,
/// Norm of current aggregated subgradient.
sgnorm: Real,
/// The currently used master problem weight.
cur_weight: Real,
/// Maximal number of iterations.
max_iter: usize,
/// Number of iterations.
cnt_iter: usize,
/// Number of inner model updates.
cnt_updates: usize,
/// Function upper bounds in the candidate.
nxt_ubs: Vec<Real>,
/// Number of subproblems without upper bounds.
///
/// There should be running processes to compute these upper bounds.
cnt_remaining_ubs: usize,
/// The model lower bounds (subgradient values) in the candidate.
nxt_cutvals: Vec<Real>,
/// Number of subproblems without model lower bounds.
///
/// There should be running processes to compute at least one lower bound.
cnt_remaining_mins: usize,
/// Step direction (i.e. nxt_y - cur_y).
nxt_d: Arc<DVector>,
/// Current candidate.
nxt_y: Arc<DVector>,
/// True if the problem has been updated after the last evaluation.
///
/// If this value is false there might be a pending model update. The
/// algorithm must not terminate before the model got a chance to update
/// because new variables could be generated.
updated: bool,
}
impl SolverData {
/// Reset solver data to initial values.
///
/// This means that almost everything is set to +infinity so that
/// a null-step is forced after the first evaluation.
fn init(&mut self, y: DVector) {
self.cur_y = y;
self.cur_val = Real::infinity();
self.nxt_val = Real::infinity();
self.nxt_mod = -Real::infinity();
self.new_cutval = -Real::infinity();
self.expected_progress = Real::infinity();
self.sgnorm = Real::infinity();
self.cur_weight = 1.0;
}
}
impl Default for SolverData {
fn default() -> SolverData {
SolverData {
cur_y: dvec![],
cur_val: 0.0,
nxt_val: 0.0,
nxt_mod: 0.0,
new_cutval: 0.0,
expected_progress: 0.0,
sgnorm: 0.0,
cur_weight: 1.0,
max_iter: 0,
cnt_iter: 0,
cnt_updates: 0,
nxt_ubs: vec![],
cnt_remaining_ubs: 0,
nxt_cutvals: vec![],
cnt_remaining_mins: 0,
nxt_d: Arc::new(dvec![]),
nxt_y: Arc::new(dvec![]),
updated: true,
}
}
}
impl StandardTerminatable for SolverData {
fn center_value(&self) -> Real {
self.cur_val
}
fn expected_progress(&self) -> Real {
self.expected_progress
}
}
impl HKWeightable for SolverData {
fn current_weight(&self) -> Real {
self.cur_weight
}
fn center(&self) -> &DVector {
&self.cur_y
}
fn center_value(&self) -> Real {
self.cur_val
}
fn candidate_value(&self) -> Real {
self.nxt_val
}
fn candidate_model(&self) -> Real {
self.nxt_mod
}
fn new_cutvalue(&self) -> Real {
self.new_cutval
}
fn sgnorm(&self) -> Real {
self.sgnorm
}
}
/// Data providing access for updating the problem.
struct UpdateData<Pr>
where
Pr: Send,
{
/// Type of step.
step: Step,
/// Current center of stability.
cur_y: Arc<DVector>,
/// Current candidate.
nxt_y: Arc<DVector>,
/// The (cached) aggregated primals.
primal_aggrs: Vec<Pr>,
}
impl<Pr> UpdateState<Pr> for UpdateData<Pr>
where
Pr: Send + 'static,
{
fn was_descent(&self) -> bool {
self.step == Step::Descent
}
fn center(&self) -> &DVector {
&self.cur_y
}
fn candidate(&self) -> &DVector {
&self.nxt_y
}
fn aggregated_primal(&self, i: usize) -> &Pr {
&self.primal_aggrs[i]
}
}
/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
P: FirstOrderProblem,
M: MasterBuilder<P::Minorant>,
{
/// Parameters for the solver.
pub params: Parameters,
/// Termination predicate.
pub terminator: T,
/// Weighter heuristic.
pub weighter: W,
/// The threadpool of the solver.
pub threadpool: ThreadPool,
/// The master problem builder.
pub master: M,
/// The first order problem.
problem: P,
/// The algorithm data.
data: SolverData,
/// The master problem process.
master_proc: Option<MasterProcess<P, M::MasterProblem>>,
/// The channel to receive the evaluation results from subproblems.
client_tx: Option<ClientSender<usize, P, M::MasterProblem>>,
/// The channel to receive the evaluation results from subproblems.
client_rx: Option<ClientReceiver<usize, P, M::MasterProblem>>,
/// Number of descent steps.
cnt_descent: usize,
/// Number of null steps.
cnt_null: usize,
/// Number of function evaluation.
cnt_evals: usize,
/// Time when the solution process started.
///
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
T: Terminator<SolverData> + Default,
W: Weighter<SolverData> + Default,
M: MasterBuilder<P::Minorant>,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
where
M: Default,
{
Self::with_master(problem, M::default())
}
/// Create a new parallel bundle solver.
pub fn with_master(problem: P, master: M) -> Self {
let ncpus = num_cpus::get();
info!("Initializing synchronous solver with {} CPUs", ncpus);
Solver {
params: Parameters::default(),
terminator: Default::default(),
weighter: Default::default(),
problem,
data: SolverData::default(),
threadpool: ThreadPool::with_name("Synchronous parallel bundle solver".to_string(), ncpus),
master,
master_proc: None,
client_tx: None,
client_rx: None,
cnt_descent: 0,
cnt_null: 0,
cnt_evals: 0,
start_time: Instant::now(),
}
}
/// Return the underlying threadpool.
///
/// In order to use the same threadpool for concurrent processes,
/// just clone the returned `ThreadPool`.
pub fn threadpool(&self) -> &ThreadPool {
&self.threadpool
}
/// Set the threadpool.
///
/// This function allows to use a specific threadpool for all processes
/// spawned by the solver. Note that this does not involve any threads
/// used by the problem because the solver is not responsible for executing
/// the evaluation process of the subproblems. However, the problem might
/// use the same threadpool as the solver.
pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
self.threadpool = threadpool;
}
/// Return the current problem associated with the solver.
pub fn problem(&self) -> &P {
&self.problem
}
/// Return the current problem associated with the solver.
pub fn into_problem(self) -> P {
self.problem
}
/// Initialize the solver.
///
/// This will reset the internal data structures so that a new fresh
/// solution process can be started.
///
/// It will also setup all worker processes.
///
/// This function is automatically called by [`Solver::solve`].
pub fn init(&mut self) -> Result<(), P, M> {
debug!("Initialize solver");
let n = self.problem.num_variables();
let m = self.problem.num_subproblems();
self.data.init(dvec![0.0; n]);
self.cnt_descent = 0;
self.cnt_null = 0;
self.cnt_evals = 0;
let (tx, rx) = channel();
self.client_tx = Some(tx.clone());
self.client_rx = Some(rx);
let lower_bounds = self.problem.lower_bounds().map(DVector);
if lower_bounds.as_ref().map(|lb| lb.len() != n).unwrap_or(false) {
return Err(Error::Dimension {
what: "lower bounds".to_string(),
});
}
let upper_bounds = self.problem.upper_bounds().map(DVector);
if upper_bounds.as_ref().map(|ub| ub.len() != n).unwrap_or(false) {
return Err(Error::Dimension {
what: "upper bounds".to_string(),
});
}
debug!("Start master process");
// Initialize the master problem.
let mut master = self.master.build().map_err(Error::BuildMaster)?;
master.set_num_subproblems(m).map_err(Error::BuildMaster)?;
master
.set_vars(n, lower_bounds, upper_bounds)
.map_err(Error::BuildMaster)?;
self.master_proc = Some(MasterProcess::start(master, tx, &mut self.threadpool));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(
i,
y.clone(),
ChannelResultSender::new(i, self.client_tx.clone().unwrap()),
)
.map_err(Error::Evaluation)?;
}
debug!("Initialization complete");
self.start_time = Instant::now();
Ok(())
}
fn reset_iteration_data(&mut self, max_iter: usize) {
let num_subproblems = self.problem.num_subproblems();
let num_variables = self.problem.num_variables();
self.data.max_iter = max_iter;
self.data.cnt_iter = 0;
self.data.cnt_updates = 0;
self.data.nxt_ubs = vec![Real::infinity(); num_subproblems];
self.data.cnt_remaining_ubs = num_subproblems;
self.data.nxt_cutvals = vec![-Real::infinity(); num_subproblems];
self.data.cnt_remaining_mins = num_subproblems;
self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
self.data.nxt_y = Arc::new(dvec![]);
self.data.updated = true;
}
/// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
pub fn solve(&mut self) -> Result<(), P, M> {
self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
}
/// Solve the problem with a maximal iteration limit.
pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), P, M> {
// First initialize the internal data structures.
self.init()?;
if self.solve_iter(limit)? {
Ok(())
} else {
Err(Error::IterationLimit { limit })
}
}
/// Solve the problem but stop after at most `niter` iterations.
///
/// The function returns `Ok(true)` if the termination criterion
/// has been satisfied. Otherwise it returns `Ok(false)` or an
/// error code.
///
/// If this function is called again, the solution process is
/// continued from the previous point. In order to start a fresh
/// computation, call `init()` explicitly before calling this
/// method (this is not necessary for the first call).
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
debug!("Start solving up to {} iterations", niter);
if let Some(ref mut master_proc) = self.master_proc {
// Continue the solution process, start with a new master
// problem evaluation.
if !self.data.cur_val.is_infinite() {
master_proc.solve(self.data.cur_val)?;
}
} else {
// No master problem, initialize solver.
self.init()?;
}
self.reset_iteration_data(niter);
loop {
let msg = self.client_rx.as_ref().ok_or(Error::NotInitialized)?.recv()?;
match msg {
Message::Eval(m) => {
if self.handle_client_response(m)? {
return Ok(false);
}
}
Message::Update(_) => warn!("Ignore unexpected problem update message from client"),
Message::Master(mresponse) => {
debug!("Receive master response");
// Receive result (new candidate) from the master
if self.handle_master_response(mresponse)? {
return Ok(true);
}
}
}
}
}
/// Handle a response from a subproblem evaluation.
///
/// The function returns `Ok(true)` if the final iteration count
/// has been reached. In that case the master problem is *not*
/// resolved and must be started again explicitly.
fn handle_client_response(&mut self, msg: EvalResult<usize, P::Minorant, P::Err>) -> Result<bool, P, M> {
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
match msg {
EvalResult::ObjectiveValue { index, value } => {
debug!("Receive objective from subproblem {}: {}", index, value);
if self.data.nxt_ubs[index].is_infinite() {
self.data.cnt_remaining_ubs -= 1;
}
self.data.nxt_ubs[index] = self.data.nxt_ubs[index].min(value);
}
EvalResult::Minorant { index, mut minorant } => {
debug!("Receive minorant from subproblem {}", index);
if self.data.nxt_cutvals[index].is_infinite() {
self.data.cnt_remaining_mins -= 1;
}
// move center of minorant to cur_y
minorant.move_center(-1.0, &self.data.nxt_d);
self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant());
// add minorant to master problem
master.add_minorant(index, minorant)?;
}
EvalResult::Done { .. } => return Ok(false), // nothing to do here
EvalResult::Error { err, .. } => return Err(Error::Evaluation(err)),
}
if self.data.cnt_remaining_ubs > 0 || self.data.cnt_remaining_mins > 0 {
// Haven't received data from all subproblems, yet.
return Ok(false);
}
// All subproblems have been evaluated, do a step.
let nxt_ub = self.data.nxt_ubs.iter().sum::<Real>();
let descent_bnd = Self::get_descent_bound(self.params.acceptance_factor, &self.data);
self.data.nxt_val = nxt_ub;
self.data.new_cutval = self.data.nxt_cutvals.iter().sum::<Real>();
debug!("Step");
debug!(" cur_val ={}", self.data.cur_val);
debug!(" nxt_mod ={}", self.data.nxt_mod);
debug!(" nxt_ub ={}", nxt_ub);
debug!(" descent_bnd={}", descent_bnd);
self.data.updated = false;
let step;
if self.data.cur_val.is_infinite() {
// This is the first evaluation. We effectively get
// the function value at the current center but
// we do not have a model estimate yet. Hence, we do not know
// a good guess for the weight.
step = Step::Descent;
self.data.cur_val = nxt_ub;
self.data.cur_weight = Real::infinity();
master.set_weight(1.0)?;
self.data.updated = true;
debug!("First Step");
debug!(" cur_val={}", self.data.cur_val);
debug!(" cur_y={}", self.data.cur_y);
} else if nxt_ub <= descent_bnd {
step = Step::Descent;
self.cnt_descent += 1;
// Note that we must update the weight *before* we
// change the internal data, so the old information
// that caused the descent step is still available.
self.data.cur_weight = self.weighter.descent_weight(&self.data);
self.data.cur_y = self.data.nxt_y.as_ref().clone();
self.data.cur_val = nxt_ub;
master.move_center(1.0, self.data.nxt_d.clone(), self.cnt_descent)?;
master.set_weight(self.data.cur_weight)?;
debug!("Descent Step");
debug!(" dir ={}", self.data.nxt_d);
debug!(" newy={}", self.data.cur_y);
} else {
step = Step::Null;
self.cnt_null += 1;
self.data.cur_weight = self.weighter.null_weight(&self.data);
master.set_weight(self.data.cur_weight)?;
}
self.show_info(step);
self.data.cnt_iter += 1;
// Update problem.
if self.update_problem(step)? {
self.data.updated = true;
}
if self.data.cnt_iter < self.data.max_iter {
// Compute the new candidate. The main loop will wait for the result of
// this solution process of the master problem.
self.master_proc.as_mut().unwrap().solve(self.data.cur_val)?;
Ok(false)
} else {
Ok(true)
}
}
fn handle_master_response(&mut self, master_res: MasterResponse<P, M::MasterProblem>) -> Result<bool, P, M> {
match master_res {
Response::Error(err) => return Err(err.into()),
Response::Result {
nxt_mod,
sgnorm,
cnt_updates,
nxt_d,
..
} => {
self.data.nxt_d = Arc::new(nxt_d);
self.data.nxt_mod = nxt_mod;
self.data.sgnorm = sgnorm;
self.data.cnt_updates = cnt_updates;
}
};
self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
// If this is the very first solution of the model,
// we use its result as to make a good guess for the initial weight
// of the proximal term and resolve.
if self.data.cur_weight.is_infinite() {
self.data.cur_weight = self.weighter.initial_weight(&self.data);
master.set_weight(self.data.cur_weight)?;
master.solve(self.data.cur_val)?;
return Ok(false);
}
if self.terminator.terminate(&self.data) && !self.data.updated {
self.show_info(Step::Term);
info!("Termination criterion satisfied");
return Ok(true);
}
// Compress bundle
master.compress()?;
// Compute new candidate.
let mut next_y = dvec![];
next_y.add(&self.data.cur_y, &self.data.nxt_d);
self.data.nxt_y = Arc::new(next_y);
// Reset evaluation data.
self.data.nxt_ubs.clear();
self.data
.nxt_ubs
.resize(self.problem.num_subproblems(), Real::infinity());
self.data.cnt_remaining_ubs = self.problem.num_subproblems();
self.data.nxt_cutvals.clear();
self.data
.nxt_cutvals
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
.evaluate(
i,
self.data.nxt_y.clone(),
ChannelResultSender::new(i, client_tx.clone()),
)
.map_err(Error::Evaluation)?;
}
Ok(false)
}
fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
let (update_tx, update_rx) = channel();
self.problem
.update(
UpdateData {
cur_y: Arc::new(self.data.cur_y.clone()),
nxt_y: self.data.nxt_y.clone(),
step,
primal_aggrs: (0..self.problem.num_subproblems())
.map(|i| master_proc.get_aggregated_primal(i))
.map(|p| p.map_err(Error::from))
.collect::<std::result::Result<Vec<_>, _>>()?,
},
ChannelUpdateSender::<_, _, _, <M::MasterProblem as MasterProblem<_>>::Err>::new(
self.data.cnt_iter,
update_tx,
),
)
.map_err(Error::Update)?;
let mut have_update = false;
for msg in update_rx {
if let Message::Update(update) = msg {
match update {
Update::AddVariables { bounds, sgext, .. } => {
have_update = true;
let newvars = bounds
.into_iter()
.map(|(lower, upper)| {
if lower <= upper {
let value = lower.max(Real::zero()).min(upper);
Ok((lower - value, upper - value, value))
} else {
Err(Error::InvalidBounds { lower, upper })
}
})
.collect::<Result<Vec<_>, P, M>>()?;
if !newvars.is_empty() {
// add new variables
self.data.cur_y.extend(newvars.iter().map(|v| v.2));
master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1)).collect(), sgext.into())?;
}
}
Update::Done { .. } => (), // there's nothing to do
Update::Error { err, .. } => return Err(Error::Update(err)),
}
} else {
unreachable!("Only update results allowed during update");
}
}
Ok(have_update)
}
/// Return the bound the function value must be below of to enforce a descent step.
///
/// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
/// bundle method will perform a descent step.
///
/// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
/// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
/// progress and $\varrho$ is the `acceptance_factor`.
fn get_descent_bound(acceptance_factor: Real, data: &SolverData) -> Real {
data.cur_val - acceptance_factor * (data.cur_val - data.nxt_mod)
}
fn show_info(&self, step: Step) {
let time = self.start_time.elapsed();
info!(
"{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4} {:4} {:4}{:1} {:9.4} {:9.4} \
{:12.6e}({:12.6e}) {:12.6e}",
if step == Step::Term { "_endit" } else { "endit " },
time.as_secs() / 3600,
(time.as_secs() / 60) % 60,
time.as_secs() % 60,
time.subsec_nanos() / 10_000_000,
self.cnt_descent,
self.cnt_descent + self.cnt_null,
self.data.cnt_updates,
(self.cnt_descent + self.cnt_null) * self.problem.num_subproblems(),
if step == Step::Descent { "*" } else { " " },
self.data.cur_weight,
self.data.expected_progress(),
self.data.nxt_mod,
self.data.nxt_val,
self.data.cur_val
);
}
/// Return the total number iterations in the last call to a `solve*` method.
pub fn num_iterations(&self) -> usize {
self.data.cnt_iter
}
/// Return the aggregated primal of the given subproblem.
pub fn aggregated_primal(&self, subproblem: usize) -> Result<<P::Minorant as Minorant>::Primal, P, M> {
Ok(self
.master_proc
.as_ref()
.ok_or(Error::NotSolved)?
.get_aggregated_primal(subproblem)?)
}
/// Return the current center of stability.
pub fn center(&self) -> &DVector {
&self.data.cur_y
}
}
#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub struct SyncSolverState<T, W, M> {
data: SolverData,
cnt_descent: usize,
cnt_null: usize,
cnt_evals: usize,
terminator_state: T,
weighter_state: W,
master_state: Option<MasterProcessState<M>>,
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SaveError<MErr, SErr, TErr, WErr> {
#[error("Master problem creation failed")]
BuildMaster(#[source] MErr),
#[error("Restoring the master state failed")]
RestoreMasterState(#[source] SErr),
#[error("Getting the master state failed")]
GetMasterState(#[source] SErr),
#[error("Restoring the terminator state failed")]
RestoreTerminatorState(TErr),
#[error("Getting the terminator state failed")]
GetTerminatorState(TErr),
#[error("Restoring the weighter state failed")]
RestoreWeighterState(WErr),
#[error("Getting the weighter state failed")]
GetWeighterState(WErr),
}
impl<P, T, W, M> Saveable for Solver<P, T, W, M>
where
P: FirstOrderProblem,
T: Terminator<SolverData> + Saveable + Default,
W: Weighter<SolverData> + Saveable + Default,
M: MasterBuilder<P::Minorant>,
M::MasterProblem: Saveable,
{
type State = SyncSolverState<T::State, W::State, <M::MasterProblem as Saveable>::State>;
type Err = SaveError<
<M::MasterProblem as MasterProblem<<P as FirstOrderProblem>::Minorant>>::Err,
<M::MasterProblem as Saveable>::Err,
T::Err,
W::Err,
>;
fn set_state(&mut self, state: Self::State) -> std::result::Result<(), Self::Err> {
self.data = state.data;
self.cnt_descent = state.cnt_descent;
self.cnt_null = state.cnt_null;
self.cnt_evals = state.cnt_evals;
self.terminator
.set_state(state.terminator_state)
.map_err(SaveError::RestoreTerminatorState)?;
self.weighter
.set_state(state.weighter_state)
.map_err(SaveError::RestoreWeighterState)?;
let (tx, rx) = channel();
self.client_tx = Some(tx.clone());
self.client_rx = Some(rx);
self.master_proc = if let Some(s) = state.master_state {
let master = self.master.build().map_err(SaveError::BuildMaster)?;
let mut proc = MasterProcess::start(master, tx, &mut self.threadpool);
proc.set_state(s).map_err(SaveError::RestoreMasterState)?;
Some(proc)
} else {
None
};
Ok(())
}
/// Return the current solver state to the given writer.
fn get_state(&self) -> std::result::Result<Self::State, Self::Err> {
Ok(SyncSolverState {
data: self.data.clone(),
cnt_descent: self.cnt_descent,
cnt_null: self.cnt_null,
cnt_evals: self.cnt_evals,
terminator_state: self.terminator.get_state().map_err(SaveError::GetTerminatorState)?,
weighter_state: self.weighter.get_state().map_err(SaveError::GetWeighterState)?,
master_state: if let Some(m) = self.master_proc.as_ref() {
Some(m.get_state().map_err(SaveError::GetMasterState)?)
} else {
None
},
})
}
}