RsBundle  sync.rs at [a85a975232]

File src/solver/sync.rs artifact 285097b209 part of check-in a85a975232


/*
 * Copyright (c) 2019-2023 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous parallel bundle solver.

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};

#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};

use log::{debug, info, warn};
use num_traits::{Float, FromPrimitive, One, Zero};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
use threadpool::ThreadPool;

use crate::saveable::Saveable;
use crate::{DVector, Minorant, Real};

use super::channels::{
    ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterError, MasterProcess, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;

/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error<PErr, MErr> {
    #[error("Master problem creation failed.")]
    BuildMaster(#[source] MErr),
    #[error("Error solving the master problem.")]
    Master(#[source] MasterError<PErr, MErr>),
    #[error("Iteration limit ({limit}) has been reached.")]
    IterationLimit { limit: self::Limit },
    // WARNING: do *not* add #[source] to `PErr` fields! This might
    // not implement `StdError` and would cause `Error` to not
    // implement `StdError` as well!!!
    #[error("Problem evaluation failed.")]
    Evaluation(PErr),
    #[error("Problem update failed.")]
    Update(PErr),
    #[error("Wrong dimension of `{what}`.")]
    Dimension { what: String },
    #[error("Invalid variable bounds (lower: {lower}, upper: {upper}).")]
    InvalidBounds { lower: Real, upper: Real },
    #[error("Lower bound for convexity constraint variable with index {0} must be zero.")]
    InvalidConvexityLowerBound(usize),
    #[error("Convexity constraint variable with index {0} must be unbounded from above.")]
    InvalidConvexityUpperBound(usize),
    #[error("Disconnected receiving channel to a subprocess.")]
    DisconnectedReceiver(#[source] RecvError),
    #[error("Solver has not been initialized.")]
    NotInitialized,
    #[error("Problem has not been solved, yet.")]
    NotSolved,
}

unsafe impl<PErr: Sync, MErr: Sync> Sync for Error<PErr, MErr> {}
unsafe impl<PErr: Send, MErr: Send> Send for Error<PErr, MErr> {}

/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<
        <P as FirstOrderProblem>::Err,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Minorant>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Minorant,
        >>::Err,
    >,
>;

impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr> {
    fn from(err: MasterError<PErr, MErr>) -> Error<PErr, MErr> {
        Error::Master(err)
    }
}

impl<PErr, MErr> From<RecvError> for Error<PErr, MErr> {
    fn from(err: RecvError) -> Error<PErr, MErr> {
        Error::DisconnectedReceiver(err)
    }
}

/// Limit for a solver run.
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
pub enum Limit {
    /// Maximal number of iterations, i.e. null steps and descent steps.
    NumIters(usize),
    /// Maximal number of descent steps.
    NumDescents(usize),
    /// Maximal time duration.
    Time(Duration),
}

impl Default for Limit {
    fn default() -> Limit {
        Limit::NumDescents(10_000)
    }
}

impl std::fmt::Display for Limit {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            Limit::NumIters(nsteps) => write!(f, "MaxIters: {}", nsteps),
            Limit::NumDescents(nsteps) => write!(f, "MaxDescent: {}", nsteps),
            Limit::Time(dur) => write!(f, "Time: {:?}", dur),
        }
    }
}

impl Limit {
    fn reached<P, T, W, M>(&self, solver: &Solver<P, T, W, M>) -> bool
    where
        P: FirstOrderProblem,
        M: MasterBuilder<P::Minorant>,
    {
        match *self {
            Limit::NumIters(max_iter) => solver.data.cnt_iter >= max_iter,
            Limit::NumDescents(max_descent) => solver.cnt_descent >= max_descent,
            Limit::Time(dur) => (Instant::now() - solver.this_start_time) >= dur,
        }
    }
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
pub struct Parameters {
    /// The descent step acceptance factors, must be in (0,1).
    ///
    /// The default value is 0.1.
    acceptance_factor: Real,
}

impl Default for Parameters {
    fn default() -> Self {
        Parameters { acceptance_factor: 0.1 }
    }
}

impl Parameters {
    /// Change the descent step acceptance factor.
    ///
    /// The default value is 0.1.
    pub fn set_acceptance_factor(&mut self, acceptance_factor: Real) {
        assert!(
            acceptance_factor > 0.0 && acceptance_factor < 1.0,
            "Descent step acceptance factors must be in (0,1), got: {}",
            acceptance_factor
        );
        self.acceptance_factor = acceptance_factor;
    }
}

/// The step type that has been performed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Step {
    /// A null step has been performed.
    Null,
    /// A descent step has been performed.
    Descent,
    /// No step but the algorithm has been terminated.
    Term,
}

#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub struct SolverData {
    /// Current center of stability.
    cur_y: DVector,

    /// Function value in the current point.
    cur_val: Real,

    /// Function value at the current candidate.
    nxt_val: Real,

    /// Model value at the current candidate.
    nxt_mod: Real,

    /// The value of the new minorant in the current center.
    new_cutval: Real,

    /// The current expected progress.
    ///
    /// This value is actually `cur_val - nxt_val`. We store it separately only
    /// for debugging purposes because after a descent step `cur_val` will be
    /// changed and we could not see the "old" expected progress anymore that
    /// led to the descent step.
    expected_progress: Real,

    /// Norm of current aggregated subgradient.
    sgnorm: Real,

    /// The currently used master problem weight.
    cur_weight: Real,

    /// Iteration limit.
    limit: Limit,

    /// Number of iterations.
    cnt_iter: usize,

    /// Number of inner model updates.
    cnt_updates: usize,

    /// Function upper bounds in the candidate.
    nxt_ubs: Vec<Real>,

    /// Number of subproblems without upper bounds.
    ///
    /// There should be running processes to compute these upper bounds.
    cnt_remaining_ubs: usize,
    /// The model lower bounds (subgradient values) in the candidate.
    nxt_cutvals: Vec<Real>,
    /// Number of subproblems without model lower bounds.
    ///
    /// There should be running processes to compute at least one lower bound.
    cnt_remaining_mins: usize,

    /// Step direction (i.e. nxt_y - cur_y).
    nxt_d: Arc<DVector>,

    /// Current candidate.
    nxt_y: Arc<DVector>,

    /// True if the problem has been updated after the last evaluation.
    ///
    /// If this value is false there might be a pending model update. The
    /// algorithm must not terminate before the model got a chance to update
    /// because new variables could be generated.
    updated: bool,
}

impl SolverData {
    /// Reset solver data to initial values.
    ///
    /// This means that almost everything is set to +infinity so that
    /// a null-step is forced after the first evaluation.
    fn init(&mut self, y: DVector) {
        self.cur_y = y;
        self.cur_val = Real::infinity();
        self.nxt_val = Real::infinity();
        self.nxt_mod = -Real::infinity();
        self.new_cutval = -Real::infinity();
        self.expected_progress = Real::infinity();
        self.sgnorm = Real::infinity();
        self.cur_weight = 1.0;
    }
}

impl Default for SolverData {
    fn default() -> SolverData {
        SolverData {
            cur_y: dvec![],
            cur_val: 0.0,
            nxt_val: 0.0,
            nxt_mod: 0.0,
            new_cutval: 0.0,
            expected_progress: 0.0,
            sgnorm: 0.0,
            cur_weight: 1.0,

            limit: Limit::default(),
            cnt_iter: 0,
            cnt_updates: 0,
            nxt_ubs: vec![],
            cnt_remaining_ubs: 0,
            nxt_cutvals: vec![],
            cnt_remaining_mins: 0,
            nxt_d: Arc::new(dvec![]),
            nxt_y: Arc::new(dvec![]),
            updated: true,
        }
    }
}

impl StandardTerminatable for SolverData {
    fn expected_progress(&self) -> Real {
        self.expected_progress
    }

    fn center_value(&self) -> Real {
        self.cur_val
    }
}

impl HKWeightable for SolverData {
    fn current_weight(&self) -> Real {
        self.cur_weight
    }

    fn center(&self) -> &DVector {
        &self.cur_y
    }

    fn center_value(&self) -> Real {
        self.cur_val
    }

    fn candidate_value(&self) -> Real {
        self.nxt_val
    }

    fn candidate_model(&self) -> Real {
        self.nxt_mod
    }

    fn new_cutvalue(&self) -> Real {
        self.new_cutval
    }

    fn sgnorm(&self) -> Real {
        self.sgnorm
    }
}

/// Data providing access for updating the problem.
struct UpdateData<Pr>
where
    Pr: Send,
{
    /// Type of step.
    step: Step,

    /// Current center of stability.
    cur_y: Arc<DVector>,

    /// Current candidate.
    nxt_y: Arc<DVector>,

    /// The (cached) aggregated primals.
    primal_aggrs: Vec<Pr>,
}

impl<Pr> UpdateState<Pr> for UpdateData<Pr>
where
    Pr: Send + 'static,
{
    fn was_descent(&self) -> bool {
        self.step == Step::Descent
    }

    fn center(&self) -> &DVector {
        &self.cur_y
    }

    fn candidate(&self) -> &DVector {
        &self.nxt_y
    }

    fn aggregated_primal(&self, i: usize) -> &Pr {
        &self.primal_aggrs[i]
    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
    P: FirstOrderProblem,
    M: MasterBuilder<P::Minorant>,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,

    /// Weighter heuristic.
    pub weighter: W,

    /// The threadpool of the solver.
    pub threadpool: ThreadPool,

    /// The master problem builder.
    pub master: M,

    /// The first order problem.
    problem: P,

    /// The algorithm data.
    data: SolverData,

    /// The master problem process.
    master_proc: Option<MasterProcess<P, M::MasterProblem>>,

    /// The channel to receive the evaluation results from subproblems.
    client_tx: Option<ClientSender<usize, P, M::MasterProblem>>,

    /// The channel to receive the evaluation results from subproblems.
    client_rx: Option<ClientReceiver<usize, P, M::MasterProblem>>,

    /// Number of descent steps.
    cnt_descent: usize,

    /// Number of null steps.
    cnt_null: usize,

    /// Number of function evaluation.
    cnt_evals: usize,

    /// Time when the solution process started.
    ///
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,

    /// Time when the this partial solution process started.
    ///
    /// This is actually the time of the last call to `reset_iteration_data`,
    /// which is called at the start of `solve_iter`.
    this_start_time: Instant,

    /// Time when the last master problem evaluation started.
    start_master_time: Instant,

    /// Time when the last subproblem evaluation started.
    start_eval_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder<P::Minorant>,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default())
    }

    /// Create a new parallel bundle solver.
    pub fn with_master(problem: P, master: M) -> Self {
        let ncpus = num_cpus::get();
        info!("Initializing synchronous solver with {} CPUs", ncpus);
        Solver {
            params: Parameters::default(),
            terminator: Default::default(),
            weighter: Default::default(),
            problem,
            data: SolverData::default(),

            threadpool: ThreadPool::with_name("Synchronous parallel bundle solver".to_string(), ncpus),
            master,
            master_proc: None,
            client_tx: None,
            client_rx: None,

            cnt_descent: 0,
            cnt_null: 0,
            cnt_evals: 0,

            start_time: Instant::now(),
            this_start_time: Instant::now(),
            start_master_time: Instant::now(),
            start_eval_time: Instant::now(),
        }
    }

    /// Return the underlying threadpool.
    ///
    /// In order to use the same threadpool for concurrent processes,
    /// just clone the returned `ThreadPool`.
    pub fn threadpool(&self) -> &ThreadPool {
        &self.threadpool
    }

    /// Set the threadpool.
    ///
    /// This function allows to use a specific threadpool for all processes
    /// spawned by the solver. Note that this does not involve any threads
    /// used by the problem because the solver is not responsible for executing
    /// the evaluation process of the subproblems. However, the problem might
    /// use the same threadpool as the solver.
    pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
        self.threadpool = threadpool;
    }

    /// Return the current problem associated with the solver.
    pub fn problem(&self) -> &P {
        &self.problem
    }

    /// Return the current problem associated with the solver.
    pub fn into_problem(self) -> P {
        self.problem
    }

    /// Initialize the solver.
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`Solver::solve`].
    pub fn init(&mut self) -> Result<(), P, M> {
        debug!("Initialize solver");

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        let (tx, rx) = channel();
        self.client_tx = Some(tx.clone());
        self.client_rx = Some(rx);

        let lower_bounds = self.problem.lower_bounds().map(DVector);
        if lower_bounds.as_ref().map(|lb| lb.len() != n).unwrap_or(false) {
            return Err(Error::Dimension {
                what: "lower bounds".to_string(),
            });
        }

        let upper_bounds = self.problem.upper_bounds().map(DVector);
        if upper_bounds.as_ref().map(|ub| ub.len() != n).unwrap_or(false) {
            return Err(Error::Dimension {
                what: "upper bounds".to_string(),
            });
        }

        let mut constraint_sets = HashMap::new();
        for i in 0..n {
            if let Some(k) = self.problem.constraint_index(i) {
                if lower_bounds.as_ref().map(|lb| lb[i] != 0.0).unwrap_or(true) {
                    return Err(Error::InvalidConvexityLowerBound(i));
                }
                if upper_bounds
                    .as_ref()
                    .map(|ub| ub[i] != Real::infinity())
                    .unwrap_or(false)
                {
                    return Err(Error::InvalidConvexityUpperBound(i));
                }
                constraint_sets.entry(k).or_insert_with(Vec::new).push(i);
            }
        }

        // compute initial y
        let mut init_y = dvec![Real::zero(); n];
        for convexity_set in constraint_sets.values() {
            let value = Real::one() / Real::from_usize(convexity_set.len()).unwrap();
            for &i in convexity_set {
                init_y[i] = value;
            }
        }

        self.data.init(init_y);
        self.cnt_descent = 0;
        self.cnt_null = 0;
        self.cnt_evals = 0;

        debug!("Start master process");

        // Initialize the master problem.
        let mut master = self.master.build().map_err(Error::BuildMaster)?;
        master.set_num_subproblems(m).map_err(Error::BuildMaster)?;
        master
            .set_vars(n, lower_bounds, upper_bounds)
            .map_err(Error::BuildMaster)?;
        master.set_convexity_sets(constraint_sets.into_values());
        // we must set the center correctly
        master.move_center(1.0, &self.data.cur_y);

        self.master_proc = Some(MasterProcess::start(master, tx, &mut self.threadpool));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(
                    i,
                    y.clone(),
                    ChannelResultSender::new(i, self.client_tx.clone().unwrap()),
                )
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();

        Ok(())
    }

    fn reset_iteration_data(&mut self, limit: Limit) {
        let num_subproblems = self.problem.num_subproblems();
        let num_variables = self.problem.num_variables();

        self.data.limit = limit;
        self.data.cnt_iter = 0;
        self.data.cnt_updates = 0;
        self.data.nxt_ubs = vec![Real::infinity(); num_subproblems];
        self.data.cnt_remaining_ubs = num_subproblems;
        self.data.nxt_cutvals = vec![-Real::infinity(); num_subproblems];
        self.data.cnt_remaining_mins = num_subproblems;
        self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
        self.data.nxt_y = Arc::new(dvec![]);
        self.data.updated = true;

        self.this_start_time = Instant::now();
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), P, M> {
        self.solve_with_limit(Limit::default())
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: Limit) -> Result<(), P, M> {
        // First initialize the internal data structures.
        self.init()?;
        self.solve_iter(limit)
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. In order to start a fresh
    /// computation, call `init()` explicitly before calling this
    /// method (this is not necessary for the first call).
    pub fn solve_iter(&mut self, limit: Limit) -> Result<(), P, M> {
        debug!("Start solving up to {}", limit);

        if let Some(ref mut master_proc) = self.master_proc {
            // Continue the solution process, start with a new master
            // problem evaluation.
            if !self.data.cur_val.is_infinite() {
                self.start_master_time = Instant::now();
                master_proc.solve(self.data.cur_val)?;
            }
        } else {
            // No master problem, initialize solver.
            self.init()?;
        }

        self.reset_iteration_data(limit);

        loop {
            let msg = self.client_rx.as_ref().ok_or(Error::NotInitialized)?.recv()?;
            match msg {
                Message::Eval(m) => self.handle_client_response(m)?,
                Message::Update(_) => warn!("Ignore unexpected problem update message from client"),
                Message::Master(mresponse) => {
                    debug!(
                        "Receive master response (time: {})",
                        self.start_master_time.elapsed().as_millis() as f64 / 1000.0
                    );
                    // Receive result (new candidate) from the master
                    if self.handle_master_response(mresponse)? {
                        return Ok(());
                    }
                }
            }
        }
    }

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns [`Error::IterationLimit`] if the iteration limit has
    /// been reached. In that case the master problem is *not*
    /// resolved and must be started again explicitly.
    fn handle_client_response(&mut self, msg: EvalResult<usize, P::Minorant, P::Err>) -> Result<(), P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
                self.data.nxt_ubs[index] = self.data.nxt_ubs[index].min(value);
            }
            EvalResult::Minorant { index, mut minorant } => {
                debug!("Receive minorant from subproblem {}", index);
                if self.data.nxt_cutvals[index].is_infinite() {
                    self.data.cnt_remaining_mins -= 1;
                }
                // move center of minorant to cur_y
                minorant.move_center(-1.0, &self.data.nxt_d);
                self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant());
                // add minorant to master problem
                master.add_minorant(index, minorant)?;
            }
            EvalResult::Done { .. } => return Ok(()), // nothing to do here
            EvalResult::Error { err, .. } => return Err(Error::Evaluation(err)),
        }

        if self.data.cnt_remaining_ubs > 0 || self.data.cnt_remaining_mins > 0 {
            // Haven't received data from all subproblems, yet.
            return Ok(());
        }

        // All subproblems have been evaluated, do a step.
        let nxt_ub = self.data.nxt_ubs.iter().sum::<Real>();
        let descent_bnd = Self::get_descent_bound(self.params.acceptance_factor, &self.data);

        self.data.nxt_val = nxt_ub;
        self.data.new_cutval = self.data.nxt_cutvals.iter().sum::<Real>();

        debug!(
            "Step (time: {})",
            self.start_eval_time.elapsed().as_millis() as f64 / 1000.0
        );
        debug!("  cur_val    ={}", self.data.cur_val);
        debug!("  nxt_mod    ={}", self.data.nxt_mod);
        debug!("  nxt_ub     ={}", nxt_ub);
        debug!("  descent_bnd={}", descent_bnd);

        if self.data.nxt_mod > nxt_ub + 1e-9 {
            warn!(
                "Function value ({}) in candidate higher than model value ({})",
                nxt_ub, self.data.nxt_mod
            );
        }

        self.data.updated = false;
        let step;
        if self.data.cur_val.is_infinite() {
            // This is the first evaluation. We effectively get
            // the function value at the current center but
            // we do not have a model estimate yet. Hence, we do not know
            // a good guess for the weight.
            step = Step::Descent;
            self.data.cur_val = nxt_ub;
            self.data.cur_weight = Real::infinity();
            master.set_weight(1.0)?;

            self.data.updated = true;

            debug!("First Step");
            debug!("  cur_val={}", self.data.cur_val);
            debug!("  cur_y={:?}", self.data.cur_y);
        } else if nxt_ub <= descent_bnd {
            step = Step::Descent;
            self.cnt_descent += 1;

            // Note that we must update the weight *before* we
            // change the internal data, so the old information
            // that caused the descent step is still available.
            self.data.cur_weight = self.weighter.descent_weight(&self.data);
            self.data.cur_y = self.data.nxt_y.as_ref().clone();
            self.data.cur_val = nxt_ub;

            master.move_center(1.0, self.data.nxt_d.clone(), self.cnt_descent)?;
            master.set_weight(self.data.cur_weight)?;

            debug!("Descent Step");
            debug!("  dir ={}", self.data.nxt_d);
            debug!("  newy={}", self.data.cur_y);
        } else {
            step = Step::Null;
            self.cnt_null += 1;
            self.data.cur_weight = self.weighter.null_weight(&self.data);
            master.set_weight(self.data.cur_weight)?;
        }

        self.show_info(step);
        self.data.cnt_iter += 1;

        // Update problem.
        if self.update_problem(step)? {
            self.data.updated = true;
        }

        if !self.data.limit.reached(self) {
            // Compute the new candidate. The main loop will wait for the result of
            // this solution process of the master problem.
            self.start_master_time = Instant::now();
            self.master_proc.as_mut().unwrap().solve(self.data.cur_val)?;
            Ok(())
        } else {
            Err(Error::IterationLimit { limit: self.data.limit })
        }
    }

    /// Handle a response from a master problem computation.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied and `Ok(false)` otherwise (except for an
    /// error).
    fn handle_master_response(&mut self, master_res: MasterResponse<P, M::MasterProblem>) -> Result<bool, P, M> {
        match master_res {
            Response::Error(err) => return Err(err.into()),
            Response::Result {
                nxt_mod,
                sgnorm,
                cnt_updates,
                nxt_d,
                ..
            } => {
                self.data.nxt_d = Arc::new(nxt_d);
                self.data.nxt_mod = nxt_mod;
                self.data.sgnorm = sgnorm;
                self.data.cnt_updates = cnt_updates;
            }
        };

        self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;

        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        // If this is the very first solution of the model,
        // we use its result as to make a good guess for the initial weight
        // of the proximal term and resolve.
        if self.data.cur_weight.is_infinite() {
            self.data.cur_weight = self.weighter.initial_weight(&self.data);
            master.set_weight(self.data.cur_weight)?;
            master.solve(self.data.cur_val)?;
            return Ok(false);
        }

        if self.terminator.terminate(&self.data) && !self.data.updated {
            self.show_info(Step::Term);
            info!("Termination criterion satisfied");
            return Ok(true);
        }

        // Compress bundle
        master.compress()?;

        // Compute new candidate.
        let mut next_y = dvec![];
        next_y.add(&self.data.cur_y, &self.data.nxt_d);
        self.data.nxt_y = Arc::new(next_y);

        // Reset evaluation data.
        self.data.nxt_ubs.clear();
        self.data
            .nxt_ubs
            .resize(self.problem.num_subproblems(), Real::infinity());
        self.data.cnt_remaining_ubs = self.problem.num_subproblems();
        self.data.nxt_cutvals.clear();
        self.data
            .nxt_cutvals
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        debug!("Evaluate");
        debug!("  nxt_y={:?}", self.data.nxt_y);
        self.start_eval_time = Instant::now();
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(
                    i,
                    self.data.nxt_y.clone(),
                    ChannelResultSender::new(i, client_tx.clone()),
                )
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    /// Update the problem.
    ///
    /// Returns `Ok(true)` if the problem has been modified and
    /// `Ok(false)` otherwise (except for an error).
    fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        let (update_tx, update_rx) = channel();
        self.problem
            .update(
                UpdateData {
                    cur_y: Arc::new(self.data.cur_y.clone()),
                    nxt_y: self.data.nxt_y.clone(),
                    step,
                    primal_aggrs: (0..self.problem.num_subproblems())
                        .map(|i| master_proc.get_aggregated_primal(i))
                        .map(|p| p.map_err(Error::from))
                        .collect::<std::result::Result<Vec<_>, _>>()?,
                },
                ChannelUpdateSender::<_, _, _, <M::MasterProblem as MasterProblem<_>>::Err>::new(
                    self.data.cnt_iter,
                    update_tx,
                ),
            )
            .map_err(Error::Update)?;

        let mut have_update = false;
        for msg in update_rx {
            if let Message::Update(update) = msg {
                match update {
                    Update::AddVariables { bounds, sgext, .. } => {
                        have_update = true;
                        let newvars = bounds
                            .into_iter()
                            .map(|(lower, upper)| {
                                if lower <= upper {
                                    let value = lower.clamp(Real::zero(), upper);
                                    Ok((lower - value, upper - value, value))
                                } else {
                                    Err(Error::InvalidBounds { lower, upper })
                                }
                            })
                            .collect::<Result<Vec<_>, P, M>>()?;
                        if !newvars.is_empty() {
                            // add new variables
                            self.data.cur_y.extend(newvars.iter().map(|v| v.2));

                            master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1)).collect(), sgext.into())?;
                        }
                    }
                    Update::AddSubproblems { minorants } => {
                        have_update = true;
                        let nnew = minorants.len();
                        master_proc.add_subproblems(&minorants)?;
                        self.data.nxt_ubs.extend(std::iter::repeat(Real::infinity()).take(nnew));
                        self.data.nxt_cutvals.extend(minorants.iter().map(|m| m.constant()));
                    }
                    Update::Done { .. } => (), // there's nothing to do
                    Update::Error { err, .. } => return Err(Error::Update(err)),
                }
            } else {
                unreachable!("Only update results allowed during update");
            }
        }

        Ok(have_update)
    }

    /// Return the bound the function value must be below of to enforce a descent step.
    ///
    /// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
    /// bundle method will perform a descent step.
    ///
    /// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
    /// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
    /// progress and $\varrho$ is the `acceptance_factor`.
    fn get_descent_bound(acceptance_factor: Real, data: &SolverData) -> Real {
        data.cur_val - acceptance_factor * (data.cur_val - data.nxt_mod)
    }

    fn show_info(&self, step: Step) {
        let time = self.start_time.elapsed();
        info!(
            "{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4} {:4} {:4}{:1}  {:9.4} {:9.4} \
             {:12.6e}({:12.6e}) {:12.6e}",
            if step == Step::Term { "_endit" } else { "endit " },
            time.as_secs() / 3600,
            (time.as_secs() / 60) % 60,
            time.as_secs() % 60,
            time.subsec_nanos() / 10_000_000,
            self.cnt_descent,
            self.cnt_descent + self.cnt_null,
            self.data.cnt_updates,
            (self.cnt_descent + self.cnt_null) * self.problem.num_subproblems(),
            if step == Step::Descent { "*" } else { " " },
            self.data.cur_weight,
            self.data.expected_progress(),
            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the total number iterations in the last call to a `solve*` method.
    pub fn num_iterations(&self) -> usize {
        self.data.cnt_iter
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<<P::Minorant as Minorant>::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }

    /// Return the current center of stability.
    pub fn center(&self) -> &DVector {
        &self.data.cur_y
    }
}

#[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))]
#[derive(Clone, Debug)]
pub struct SyncSolverState<T, W, M> {
    data: SolverData,
    cnt_descent: usize,
    cnt_null: usize,
    cnt_evals: usize,
    terminator_state: T,
    weighter_state: W,
    master_state: Option<M>,
}

#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SaveError<MErr, SErr, TErr, WErr> {
    #[error("Master problem creation failed")]
    Master(#[source] MErr),
    #[error(transparent)]
    MasterState(SErr),
    #[error(transparent)]
    TerminatorState(TErr),
    #[error(transparent)]
    WeighterState(WErr),
}

impl<P, T, W, M> Saveable for Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    T: Terminator<SolverData> + Saveable + Default,
    W: Weighter<SolverData> + Saveable + Default,
    M: MasterBuilder<P::Minorant>,
    <M as MasterBuilder<P::Minorant>>::MasterProblem: Saveable,
{
    type State = SyncSolverState<T::State, W::State, <MasterProcess<P, M::MasterProblem> as Saveable>::State>;

    type Err = SaveError<
        <M::MasterProblem as MasterProblem<<P as FirstOrderProblem>::Minorant>>::Err,
        <MasterProcess<P, M::MasterProblem> as Saveable>::Err,
        T::Err,
        W::Err,
    >;

    /// Return the current solver state to the given writer.
    fn get_state(&self) -> std::result::Result<Self::State, Self::Err> {
        Ok(SyncSolverState {
            data: self.data.clone(),
            cnt_descent: self.cnt_descent,
            cnt_null: self.cnt_null,
            cnt_evals: self.cnt_evals,
            terminator_state: self.terminator.get_state().map_err(SaveError::TerminatorState)?,
            weighter_state: self.weighter.get_state().map_err(SaveError::WeighterState)?,
            master_state: if let Some(m) = self.master_proc.as_ref() {
                Some(m.get_state().map_err(SaveError::MasterState)?)
            } else {
                None
            },
        })
    }

    fn set_state(&mut self, state: Self::State) -> std::result::Result<(), Self::Err> {
        self.data = state.data;
        self.cnt_descent = state.cnt_descent;
        self.cnt_null = state.cnt_null;
        self.cnt_evals = state.cnt_evals;
        self.terminator
            .set_state(state.terminator_state)
            .map_err(SaveError::TerminatorState)?;
        self.weighter
            .set_state(state.weighter_state)
            .map_err(SaveError::WeighterState)?;

        let (tx, rx) = channel();
        self.client_tx = Some(tx.clone());
        self.client_rx = Some(rx);

        self.master_proc = if let Some(s) = state.master_state {
            let master = self.master.build().map_err(SaveError::Master)?;
            let mut proc = MasterProcess::start(master, tx, &mut self.threadpool);
            proc.set_state(s).map_err(SaveError::MasterState)?;
            Some(proc)
        } else {
            None
        };

        Ok(())
    }
}