RsBundle  Artifact [389d424b07]

Artifact 389d424b07155349591c12ff5e3d48b06a473c19:

  • File src/solver/asyn.rs — part of check-in [9a33baacb9] at 2023-04-28 09:46:59 on branch cvx-constraint — Merge trunk (user: fifr size: 63468) [more...]

/*
 * Copyright (c) 2019-2023 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous parallel bundle solver.

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};

use float_pretty_print::PrettyPrintFloat;
use log::{debug, info, log_enabled, warn, Level::Debug};
use num_traits::{Float, FromPrimitive, One, ToPrimitive, Zero};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use threadpool::ThreadPool;

use crate::{DVector, Minorant, Real};

use super::channels::{
    ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterError, MasterProcess, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, SubgradientExtender, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

pub mod guessmodels;
pub use guessmodels::GuessModelType;
use guessmodels::{CutModel, Guess, GuessModel, NearestValue};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;

/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error<PErr, MErr> {
    #[error("master problem creation failed")]
    BuildMaster(#[source] MErr),
    #[error("error solving the master problem")]
    Master(#[source] MasterError<PErr, MErr>),
    #[error("iteration limit ({limit}) has been reached")]
    IterationLimit { limit: usize },
    #[error("problem evaluation failed")]
    Evaluation(PErr),
    #[error("problem update failed")]
    Update(PErr),
    #[error("subgradient extension failed")]
    SubgradientExtension(PErr),
    #[error("wrong dimension for {what}")]
    Dimension { what: String },
    #[error("invalid variable bounds (lower: {lower}, upper: {upper})")]
    InvalidBounds { lower: Real, upper: Real },
    #[error("Lower bound for convexity constraint variable with index {0} must be zero.")]
    InvalidConvexityLowerBound(usize),
    #[error("Convexity constraint variable with index {0} must be unbounded from above.")]
    InvalidConvexityUpperBound(usize),
    #[error("disconnected receiving channel to a subprocess")]
    DisconnectedReceiver(#[source] RecvError),
    #[error("solver has not been initialized")]
    NotInitialized,
    #[error("problem has not been solved")]
    NotSolved,
    #[error("missing function value during initialization for subproblem {0}")]
    MissingObjective(usize),
    #[error("missing minorant during initialization for subproblem {0}")]
    MissingMinorant(usize),
}

/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<
        <P as FirstOrderProblem>::Err,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Minorant>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Minorant,
        >>::Err,
    >,
>;

impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr> {
    fn from(err: MasterError<PErr, MErr>) -> Error<PErr, MErr> {
        match err {
            MasterError::SubgradientExtension(e) => Error::SubgradientExtension(e),
            _ => Error::Master(err),
        }
    }
}

impl<PErr, MErr> From<RecvError> for Error<PErr, MErr> {
    fn from(err: RecvError) -> Error<PErr, MErr> {
        Error::DisconnectedReceiver(err)
    }
}

/// Identifier for a subproblem evaluation.
#[derive(Clone, Copy, Debug)]
struct EvalId {
    /// The index of the subproblem.
    subproblem: usize,
    /// The index of the candidate at which the subproblem is evaluated.
    candidate_index: usize,
}

/// An evaluation point.
#[derive(Clone)]
pub struct Point {
    /// The globally unique index of the evaluation point.
    index: usize,

    /// The evaluation point itself.
    point: Arc<DVector>,
}

impl Point {
    fn distance(&self, p: &Point) -> Real {
        if self.index != p.index {
            let mut d = self.point.as_ref().clone();
            d.add_scaled(-1.0, &p.point);
            d.norm2()
        } else {
            Real::zero()
        }
    }
}

impl Default for Point {
    fn default() -> Point {
        Point {
            index: 0,
            point: Arc::new(dvec![]),
        }
    }
}

/// Parameters for tuning the solver.
#[derive(Debug, Clone)]
pub struct Parameters {
    /// The guess model type to be used.
    pub guess_model: GuessModelType,

    /// The increase-factor of the Lipschitz-constant guess, must be in (0,1].
    l_guess_factor: Real,

    /// The descent step acceptance factors, must be in (0,1).
    ///
    /// The default value is 0.1.
    acceptance_factor: Real,

    /// The factor for asynchronous imprecision, must be in (0,1).
    ///
    /// The default value is 0.9.
    imprecision_factor: Real,
}

impl Default for Parameters {
    fn default() -> Self {
        Parameters {
            guess_model: GuessModelType::CutModel,
            l_guess_factor: 0.1,
            acceptance_factor: 0.5,
            imprecision_factor: 0.9,
        }
    }
}

impl Parameters {
    /// Change the Lipschitz-guess increase factor.
    pub fn set_l_guess_factor(&mut self, l_guess_factor: Real) {
        assert!(
            l_guess_factor > 0.0 && l_guess_factor <= 1.0,
            "Lipschitz-guess increase factor must be in (0,1], got: {}",
            l_guess_factor
        );
        self.l_guess_factor = l_guess_factor;
    }

    /// Change the descent step acceptance factor.
    ///
    /// The default value is 0.1.
    pub fn set_acceptance_factor(&mut self, acceptance_factor: Real) {
        assert!(
            acceptance_factor > 0.0 && acceptance_factor < 1.0,
            "Descent step acceptance factors must be in (0,1), got: {}",
            acceptance_factor
        );
        self.acceptance_factor = acceptance_factor;
    }

    /// Change the imprecision acceptance factor.
    ///
    /// The default value is 0.9.
    pub fn set_imprecision_factor(&mut self, imprecision_factor: Real) {
        assert!(
            imprecision_factor > 0.0 && imprecision_factor < 1.0,
            "Imprecision factor must be in (0,1), got: {}",
            imprecision_factor
        );
        self.imprecision_factor = imprecision_factor;
    }
}

/// The step type that has been performed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Step {
    /// A null step has been performed.
    Null,
    /// A descent step has been performed.
    Descent,
    /// No step but the algorithm has been terminated.
    Term,
}

pub struct SolverData<Mn, E> {
    /// Current center of stability.
    cur_y: Point,

    /// Function value in the current point.
    cur_val: Real,

    /// Step direction (i.e. nxt_y - cur_y).
    nxt_d: Arc<DVector>,

    /// Current candidate.
    nxt_y: Point,

    /// Function value at the current candidate.
    nxt_val: Real,

    /// Model value at the current candidate.
    nxt_mod: Real,

    /// The current expected progress.
    ///
    /// This value is actually `cur_val - nxt_val`. We store it separately only
    /// for debugging purposes because after a descent step `cur_val` will be
    /// changed and we could not see the "old" expected progress anymore that
    /// led to the descent step.
    expected_progress: Real,

    /// Norm of current aggregated subgradient.
    sgnorm: Real,

    /// The error bound for the last descent step.
    error_bound: Real,

    /// The currently used master problem weight.
    cur_weight: Real,

    /// Maximal number of iterations.
    max_iter: usize,

    /// Number of inner model updates.
    cnt_updates: usize,

    /// Number of descent steps.
    cnt_descent: usize,

    /// The number of subproblems with insufficient evaluation data.
    num_insufficient_candidates: usize,

    /// The number of subproblems that have not been evaluated exactly in the center.
    num_inexact_center: usize,

    /// Whether only synchronized steps are accepted.
    force_synchronized: bool,

    /// Subproblem data.
    subs: Vec<SubData<Mn, E>>,

    /// The list of all evaluation points.
    candidates: Vec<Point>,

    /// Whether we need a new update
    need_update: bool,

    /// Whether a problem update is currently in progress.
    update_in_progress: bool,

    /// Whether the termination criterion has been satisfied but we wait for a
    /// update.
    pending_termination: bool,
}

impl<Mn, E> SolverData<Mn, E> {
    /// Reset solver data to initial values.
    ///
    /// This means that almost everything is set to +infinity so that
    /// a null-step is forced after the first evaluation.
    fn init(&mut self, y: Point) {
        self.cnt_descent = 0;
        self.cur_y = y.clone();
        self.cur_val = Real::infinity();
        self.nxt_y = y;
        self.nxt_val = Real::infinity();
        self.nxt_mod = -Real::infinity();
        self.expected_progress = Real::infinity();
        self.error_bound = Real::infinity();
        self.candidates.clear();
        self.sgnorm = Real::infinity();
        self.cur_weight = 1.0;
        self.num_insufficient_candidates = 0;
        self.num_inexact_center = self.subs.len();
        self.force_synchronized = false;
        self.pending_termination = false;
    }
}

impl<Mn, E> Default for SolverData<Mn, E> {
    fn default() -> SolverData<Mn, E> {
        SolverData {
            cur_y: Point::default(),
            cur_val: 0.0,
            nxt_val: 0.0,
            nxt_mod: 0.0,
            expected_progress: 0.0,
            sgnorm: 0.0,
            error_bound: Real::infinity(),
            cur_weight: 1.0,

            num_insufficient_candidates: 0,
            num_inexact_center: 0,
            force_synchronized: false,
            candidates: vec![],

            max_iter: 0,
            cnt_descent: 0,
            cnt_updates: 0,
            subs: vec![],
            nxt_d: Arc::new(dvec![]),
            nxt_y: Point::default(),
            need_update: true,
            update_in_progress: false,
            pending_termination: false,
        }
    }
}

impl<Mn, E> StandardTerminatable for SolverData<Mn, E> {
    fn center_value(&self) -> Real {
        self.cur_val
    }

    fn expected_progress(&self) -> Real {
        self.expected_progress
    }
}

impl<Mn, E> HKWeightable for SolverData<Mn, E> {
    fn current_weight(&self) -> Real {
        self.cur_weight
    }

    fn center(&self) -> &DVector {
        &self.cur_y.point
    }

    fn center_value(&self) -> Real {
        self.cur_val
    }

    fn candidate_value(&self) -> Real {
        self.nxt_val
    }

    fn candidate_model(&self) -> Real {
        self.nxt_mod
    }

    fn new_cutvalue(&self) -> Real {
        self.cur_val
    }

    fn sgnorm(&self) -> Real {
        self.sgnorm
    }
}

/// Data providing access for updating the problem.
struct UpdateData<Pr> {
    /// Type of step.
    step: Step,

    /// Current center of stability.
    cur_y: Arc<DVector>,

    /// Current candidate.
    nxt_y: Arc<DVector>,

    /// The master process.
    primal_aggrs: Vec<Pr>,
}

impl<Pr> UpdateState<Pr> for UpdateData<Pr>
where
    Pr: Send + 'static,
{
    fn was_descent(&self) -> bool {
        self.step == Step::Descent
    }

    fn center(&self) -> &DVector {
        &self.cur_y
    }

    fn candidate(&self) -> &DVector {
        &self.nxt_y
    }

    fn aggregated_primal(&self, i: usize) -> &Pr {
        &self.primal_aggrs[i]
    }
}

/// Allowed change of the Lipschitz-constant.
#[derive(PartialEq, Clone, Copy, Debug)]
enum UpdateL {
    /// L-constant is allowed to increase by the given factor.
    Increase(Real),
    /// L-constant is allowed to decrease by the given factor.
    Decrease(Real),
}

/// Result of a model update.
enum UpdateResult {
    /// The model has been modified.
    Modified,
    /// The model has not been modified.
    Unmodified,
    /// The pending termination has succeeded.
    Term,
}

/// Model data of a single subproblem.
///
/// This struct does not handle the subproblem model itself. However, it handles
/// the asynchronous precision data, i.e. the guessed Lipschitz-constant and the
/// distance of the evaluation points to the candidate.
///
/// The concrete model used for computing the guessed values in the candidate
/// and the center must be provided by an implementation of `SubProblem`.
struct SubData<Mn, E> {
    /// The index associated with this subproblem.
    fidx: usize,
    /// The subproblem.
    sub: Box<dyn GuessModel<Mn, E>>,
    /// The current center.
    center: Point,
    /// The current candidate.
    candidate: Point,
    /// The last index at which the evaluation has been started.
    last_eval_index: usize,
    /// Whether a subproblem evaluation is currently running.
    is_running: bool,
    /// Whether the last evaluation has been sufficiently close.
    is_close_enough: bool,
    /// The original guess value and its evaluation distance in the current center.
    center_guess: Guess,
    /// The current guess of the Lipschitz constant.
    l_guess: Real,
    /// Number of evaluations.
    cnt_evals: usize,
}

impl<Mn, E> SubData<Mn, E> {
    fn new(fidx: usize, sub: Box<dyn GuessModel<Mn, E>>, y: &Point) -> SubData<Mn, E> {
        SubData {
            fidx,
            sub,
            last_eval_index: 0,
            center: y.clone(),
            candidate: y.clone(),
            is_running: false,
            is_close_enough: false,
            center_guess: Guess::default(),
            l_guess: 0.0,
            cnt_evals: 0,
        }
    }

    /// Set the center of this model.
    ///
    /// If `update_l_guess` is true also update the guess of the Lipschitz constant.
    fn move_center(&mut self, y: &Point, l_guess_factor: UpdateL) {
        assert_eq!(y.index, self.candidate.index, "Must move to current candidate");

        // The guess value used in the current (i.e. old) center
        let old_guess = self.center_guess;
        // The cut value now known for the center.
        let old_cutvalue = self.sub.get_lower_bound(&self.center);

        // There has been a previous evaluation, so first update the Lipschitz guess ...
        if let Some((l_guess_factor, increase)) = match l_guess_factor {
            UpdateL::Increase(v) => Some((v, true)),
            UpdateL::Decrease(v) => Some((v, false)),
        } {
            debug_assert!(
                Some(l_guess_factor).map(|f| 0.0 < f && f <= 1.0).unwrap_or(true),
                "Invalid l_guess_factor: {:?}",
                l_guess_factor
            );

            if old_guess.dist > Real::zero() {
                debug!(
                    "L-guess fidx:{} guess:{} cut:{}",
                    self.fidx, old_guess.value, old_cutvalue
                );
                let new_l_guess = if old_guess.dist.is_finite() {
                    (old_cutvalue - old_guess.value) / old_guess.dist
                } else {
                    Real::zero()
                };

                if increase == (new_l_guess > self.l_guess) && new_l_guess >= 0.0 {
                    let new_l_guess = self.l_guess + l_guess_factor * (new_l_guess - self.l_guess);
                    debug!(
                        "New l_guess fidx:{} old-L:{} L:{}",
                        self.fidx, self.l_guess, new_l_guess,
                    );
                    self.l_guess = new_l_guess;
                }
            }
        }

        // Save the new center
        self.center = y.clone();

        // Save guess value of the candidate/new center
        self.center_guess = self.sub.get_guess_value(&self.center);
    }

    /// Set the candidate of this model.
    fn update_candidate(&mut self, y: &Point, accept_factor: Real) {
        self.candidate = y.clone();
        self.update_close_enough(accept_factor);
    }

    /// Add a function value to this model.
    ///
    /// The `accept_factor` is a parameter for possibly accepting
    /// the candidate guess value as "good enough" (if it has been
    /// changed by the new minorant).
    fn add_function_value(&mut self, y: &Point, value: Real, accept_factor: Real) {
        self.sub.add_function_value(y, value);
        self.update_close_enough(accept_factor)
    }

    /// Add a minorant to this model.
    ///
    /// The `accept_factor` is a parameter for possibly accepting
    /// the candidate guess value as "good enough" (if it has been
    /// changed by the new minorant).
    ///
    /// The minorant must be centered at the global 0.
    fn add_minorant(&mut self, y: &Point, m: &Arc<Mn>, accept_factor: Real) {
        self.sub.add_minorant(y, m);
        self.update_close_enough(accept_factor)
    }

    /// Add some variables to this model.
    fn add_vars(&mut self, nnew: usize, sgext: &dyn SubgradientExtender<Mn, E>) -> std::result::Result<(), E> {
        Arc::make_mut(&mut self.center.point).extend(repeat(Real::zero()).take(nnew));
        Arc::make_mut(&mut self.candidate.point).extend(repeat(Real::zero()).take(nnew));
        self.sub.add_vars(nnew, sgext)
    }

    /// Return the current guess value at the given point.
    fn get_guess_value(&mut self, y: &Point) -> Guess {
        self.sub.get_guess_value(y)
    }

    /// Return the lower bound at the given point.
    fn get_lower_bound(&mut self, y: &Point) -> Real {
        self.sub.get_lower_bound(y)
    }

    fn update_close_enough(&mut self, accept_factor: Real) {
        let g = self.sub.get_guess_value(&self.candidate);
        self.is_close_enough = g.is_exact() || g.dist * self.l_guess <= accept_factor
    }

    /// Return the error estimation in the center.
    ///
    /// This is the difference between the (current) lower bound and the used
    /// guess value.
    fn error_estimate(&mut self) -> Real {
        self.sub.get_lower_bound(&self.center) - self.center_guess.value
    }

    fn center_guess_value(&self) -> Real {
        self.center_guess.value
    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
    P: FirstOrderProblem,
    M: MasterBuilder<P::Minorant>,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,

    /// Weighter heuristic.
    pub weighter: W,

    /// The threadpool of the solver.
    pub threadpool: ThreadPool,

    /// The master problem builder.
    pub master: M,

    /// The first order problem.
    problem: P,

    /// The algorithm data.
    data: SolverData<P::Minorant, P::Err>,

    /// The master problem process.
    master_proc: Option<MasterProcess<P, M::MasterProblem>>,

    /// Whether there is currently a master computation running.
    master_running: bool,

    /// Whether the master problem has been changed.
    master_has_changed: bool,

    /// The last subgradient extender.
    sgext: Option<Arc<dyn SubgradientExtender<P::Minorant, P::Err>>>,

    /// The channel to receive the evaluation results from subproblems.
    client_tx: Option<ClientSender<EvalId, P, M::MasterProblem>>,

    /// The channel to receive the evaluation results from subproblems.
    client_rx: Option<ClientReceiver<EvalId, P, M::MasterProblem>>,

    /// Time when the solution process started.
    ///
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Send,
    T: Terminator<SolverData<P::Minorant, P::Err>> + Default,
    W: Weighter<SolverData<P::Minorant, P::Err>> + Default,
    M: MasterBuilder<P::Minorant>,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default(), Parameters::default())
    }

    /// Create a new parallel bundle solver with given parameters.
    pub fn with_params(problem: P, params: Parameters) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default(), params)
    }

    /// Create a new parallel bundle solver with master problem and parameters.
    pub fn with_master(problem: P, master: M, params: Parameters) -> Self {
        let ncpus = num_cpus::get();
        info!("Initializing asynchronous solver with {} CPUs", ncpus);
        Solver {
            params,
            terminator: Default::default(),
            weighter: Default::default(),
            problem,
            data: SolverData::default(),

            threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), ncpus),
            master,
            master_proc: None,
            master_has_changed: false,
            master_running: false,
            sgext: None,

            client_tx: None,
            client_rx: None,

            start_time: Instant::now(),
        }
    }

    /// Return the underlying threadpool.
    ///
    /// In order to use the same threadpool for concurrent processes,
    /// just clone the returned `ThreadPool`.
    pub fn threadpool(&self) -> &ThreadPool {
        &self.threadpool
    }

    /// Set the threadpool.
    ///
    /// This function allows to use a specific threadpool for all processes
    /// spawned by the solver. Note that this does not involve any threads
    /// used by the problem because the solver is not responsible for executing
    /// the evaluation process of the subproblems. However, the problem might
    /// use the same threadpool as the solver.
    pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
        self.threadpool = threadpool;
    }

    /// Return the current problem associated with the solver.
    pub fn problem(&self) -> &P {
        &self.problem
    }

    /// Return the current problem associated with the solver.
    pub fn into_problem(self) -> P {
        self.problem
    }

    /// Initialize the solver.
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`Solver::solve`].
    pub fn init(&mut self) -> Result<(), P, M> {
        debug!("Initialize solver");
        debug!("{:?}", self.params);

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        let (tx, rx) = channel();
        self.client_tx = Some(tx.clone());
        self.client_rx = Some(rx);

        let lower_bounds = self.problem.lower_bounds().map(DVector);
        if lower_bounds.as_ref().map(|lb| lb.len() != n).unwrap_or(false) {
            return Err(Error::Dimension {
                what: "lower bounds".to_string(),
            });
        }

        let upper_bounds = self.problem.upper_bounds().map(DVector);
        if upper_bounds.as_ref().map(|ub| ub.len() != n).unwrap_or(false) {
            return Err(Error::Dimension {
                what: "upper bounds".to_string(),
            });
        }

        let mut constraint_sets = HashMap::new();
        for i in 0..n {
            if let Some(k) = self.problem.constraint_index(i) {
                if lower_bounds.as_ref().map(|lb| lb[i] != 0.0).unwrap_or(true) {
                    return Err(Error::InvalidConvexityLowerBound(i));
                }
                if upper_bounds
                    .as_ref()
                    .map(|ub| ub[i] != Real::infinity())
                    .unwrap_or(false)
                {
                    return Err(Error::InvalidConvexityUpperBound(i));
                }
                constraint_sets.entry(k).or_insert_with(|| vec![]).push(i);
            }
        }

        // compute initial y
        let mut init_y = dvec![Real::zero(); n];
        for convexity_set in constraint_sets.values() {
            let value = Real::one() / Real::from_usize(convexity_set.len()).unwrap();
            for &i in convexity_set {
                init_y[i] = value;
            }
        }

        self.data.init(Point {
            index: 1,
            point: Arc::new(init_y),
        });

        debug!("Start master process");
        let mut master = self.master.build().map_err(Error::BuildMaster)?;
        master.set_num_subproblems(m).map_err(Error::BuildMaster)?;
        master
            .set_vars(n, lower_bounds, upper_bounds)
            .map_err(Error::BuildMaster)?;
        master.set_convexity_sets(constraint_sets.into_values());
        // we must set the center correctly
        master.move_center(1.0, &self.data.cur_y.point);

        self.master_proc = Some(MasterProcess::start(master, tx, &mut self.threadpool));

        debug!("Initial problem evaluation");
        // The initial evaluation point.
        self.data.nxt_y = self.data.cur_y.clone();
        // We need an initial evaluation of all oracles for the first center.
        info!("Guess model type: {:?}", self.params.guess_model);
        self.data.subs = (0..m)
            .map(|fidx| {
                SubData::new(
                    fidx,
                    match self.params.guess_model {
                        GuessModelType::NearestValue => Box::new(NearestValue::new(fidx)),
                        GuessModelType::CutModel => Box::new(CutModel::new(fidx)),
                    },
                    &self.data.cur_y,
                )
            })
            .collect();

        // This could be done better: the initial evaluation has index 1!
        self.data.candidates.push(self.data.nxt_y.clone());
        self.data.candidates.push(self.data.nxt_y.clone());

        for i in 0..m {
            self.evaluate_subproblem(i)?;
        }

        self.start_time = Instant::now();

        // wait for all subproblem evaluations.
        let mut cnt_remaining = self.problem.num_subproblems();

        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        let client_rx = self.client_rx.as_ref().ok_or(Error::NotInitialized)?;

        while cnt_remaining > 0 {
            let msg = client_rx.recv();
            match msg? {
                Message::Eval(m) => match m {
                    EvalResult::ObjectiveValue { index, value } => {
                        assert_eq!(
                            index.candidate_index, self.data.nxt_y.index,
                            "Receive objective value for unexpected candidate"
                        );
                        self.data.subs[index.subproblem].add_function_value(&self.data.nxt_y, value, 0.0);
                    }
                    EvalResult::Minorant { index, mut minorant } => {
                        assert_eq!(
                            index.candidate_index, self.data.nxt_y.index,
                            "Receive objective value for unexpected candidate"
                        );
                        // Add the minorant to the master problem.
                        // The minorant is centered at the candidate == center, so it does
                        // not need to be moved.
                        master.add_minorant(index.subproblem, minorant.clone())?;
                        self.master_has_changed = true;
                        // Center the minorant at 0.
                        minorant.move_center(-1.0, &self.data.nxt_y.point);
                        self.data.subs[index.subproblem].add_minorant(&self.data.nxt_y, &Arc::new(minorant), 0.0);
                    }
                    EvalResult::Done { index } => {
                        self.data.subs[index.subproblem].is_running = false;
                        cnt_remaining -= 1;
                    }
                    EvalResult::Error { err, index } => {
                        self.data.subs[index.subproblem].is_running = false;
                        return Err(Error::Evaluation(err));
                    }
                },
                Message::Update(_) => {
                    unreachable!("Receive update response during initialization");
                }
                Message::Master(_) => {
                    unreachable!("Receive master response during initialization");
                }
            }
        }

        // Set the initial values.
        // For the center this is the current lower bound (cut value),
        // for the candidate it is the model candidate value.
        //
        // Note that both are the same ...
        self.data.cur_val = 0.0;
        self.data.nxt_val = 0.0;
        for s in &mut self.data.subs {
            self.data.cur_val += s.get_lower_bound(&self.data.nxt_y);
            self.data.nxt_val += s.get_guess_value(&self.data.nxt_y).value;
        }
        assert!((self.data.cur_val - self.data.nxt_val).abs() < 1e-6);

        // This is the first evaluation. We effectively get
        // the function value at the current center but
        // we do not have a model estimate yet. Hence, we do not know
        // a good guess for the weight.
        self.data.cur_weight = Real::infinity();
        self.data.need_update = true;

        // Compute the initial candidate
        self.update_candidate()?;
        if let Ok(msg) = self.client_rx.as_ref().ok_or(Error::NotInitialized)?.recv() {
            if let Message::Master(m) = msg {
                self.handle_master_response(m)?;
            } else {
                unreachable!(
                    "Unexpected non-master response during initial candidate computation: {:?}",
                    msg
                );
            }
        } else {
            todo!("What to do with this error?");
        }

        // Start an initial problem update.
        self.update_problem(Step::Descent)?;

        debug!("First Step");
        debug!("  cur_val={}", self.data.cur_val);
        self.data.cnt_descent += 1;

        self.show_info(Step::Descent);

        debug!("Initialization complete");

        Ok(())
    }

    /// Reset data for new iterations.
    fn reset_iteration_data(&mut self, max_iter: usize) {
        let num_variables = self.problem.num_variables();

        self.data.max_iter = max_iter;
        self.data.cnt_updates = 0;
        self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
        self.data.nxt_y = self.data.cur_y.clone();
        let nxt_y = &self.data.nxt_y;
        self.data.nxt_val = self
            .data
            .subs
            .iter_mut()
            .map(|s| s.get_guess_value(nxt_y).value)
            .sum::<Real>();
        self.data.need_update = true;
        // TODO: there might be an update in progress -> the 'DONE' message should be
        // in the message queue ...
        //self.data.update_in_progress = false;
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), P, M> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), P, M> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
        debug!("Start solving up to {} iterations", niter);

        self.reset_iteration_data(niter);

        loop {
            let msg = self.client_rx.as_ref().ok_or(Error::NotInitialized)?.recv()?;
            match msg {
                Message::Eval(m) => {
                    // Receive a evaluation result
                    if self.handle_client_response(m)? {
                        return Ok(true);
                    }
                }
                Message::Update(msg) => {
                    debug!("Receive update response");
                    match self.handle_update_response(msg)? {
                        UpdateResult::Modified => {
                            // The master problem has been changed so we need a new
                            // candidate as well.
                            eprintln!("New candidate?");
                            self.update_candidate()?;
                        }
                        UpdateResult::Unmodified => (),
                        UpdateResult::Term => return Ok(true),
                    }
                }
                Message::Master(mresponse) => {
                    debug!("Receive master response");
                    // Receive result (new candidate) from the master
                    if self.handle_master_response(mresponse)? {
                        return Ok(true);
                    }
                }
            }
        }
    }

    /// Handle a response `msg` from a subproblem evaluation.
    ///
    /// The result from a subproblem evaluation is either a (upper bound on the)
    /// function value is some point or a subgradient (or linear minorant) on
    /// the function in some point.
    ///
    /// The current implementation will wait until all subproblems have created
    /// at least one function value and one subgradient. Once this is true,
    /// either a descent step or a null step is performed depending on the
    /// outcome of the descent test.
    ///
    /// After the step the problem gets a chance to update (e.g. generate new
    /// variables), and the master problem is started to compute a new
    /// candidate.
    ///
    /// The function returns
    ///   - `Ok(true)` if the final iteration count has been reached,
    ///   - `Ok(false)` if the final iteration count has not been reached,
    ///   - `Err(_)` on error.
    fn handle_client_response(&mut self, msg: EvalResult<EvalId, P::Minorant, P::Err>) -> Result<bool, P, M> {
        match msg {
            EvalResult::ObjectiveValue { index, value } => self.handle_new_objective(index, value),
            EvalResult::Minorant { index, minorant } => self.handle_new_minorant(index, minorant),
            EvalResult::Done { index } => {
                let sub = &mut self.data.subs[index.subproblem];
                sub.is_running = false;

                // possibly restart the subproblem for the current candidate
                self.evaluate_subproblem(index.subproblem)?;
                Ok(false)
            }
            EvalResult::Error { err, index } => {
                self.data.subs[index.subproblem].is_running = false;
                Err(Error::Evaluation(err))
            }
        }
    }

    /// A new objective value has been computed.
    fn handle_new_objective(&mut self, id: EvalId, value: Real) -> Result<bool, P, M> {
        debug!(
            "Receive objective from subproblem:{} candidate:{} current:{} obj:{}",
            id.subproblem, id.candidate_index, self.data.nxt_y.index, value
        );

        // check if new evaluation is closer to the current candidate
        let sub = &mut self.data.subs[id.subproblem];
        // Whether the subproblem evaluation had been good enough.
        let was_close_enough = sub.is_close_enough;

        if let Some(nxt) = self.data.candidates.get_mut(id.candidate_index) {
            let accept_factor =
                self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
                    / self.problem.num_subproblems().to_f64().unwrap();

            let old_can_val = sub.get_guess_value(&self.data.nxt_y).value;
            let old_cen_val = sub.get_lower_bound(&self.data.cur_y);
            sub.add_function_value(nxt, value, accept_factor);
            let new_can_val = sub.get_guess_value(&self.data.nxt_y).value;
            let new_cen_val = sub.get_lower_bound(&self.data.cur_y);

            self.data.nxt_val += new_can_val - old_can_val;
            self.data.cur_val += new_cen_val - old_cen_val;

        // This is just a safe-guard: if the function has been evaluated at
        // the current candidate, the evaluation *must* be good enough.
        //
        // Unfortunately, that safe-guard does not work with guess-models that do not use the function value (like `CutModel`) ....
        //

        // assert!(
        //     sub.is_close_enough || sub.last_eval_index < self.data.nxt_y.index,
        //     "Unexpected insufficiency fidx:{} l:{}",
        //     id.subproblem,
        //     sub.l_guess,
        // );
        } else {
            // unknown candidate -> ignore objective value
            warn!("Ignore unknown candidate index:{}", id.candidate_index);
            return Ok(false);
        }

        // Test if the new candidate is close enough for the asynchronous
        // precision test.
        if !was_close_enough && sub.is_close_enough {
            debug!(
                "Accept result fidx:{} index:{} candidate:{} (remaining insufficient: {})",
                id.subproblem, id.candidate_index, self.data.nxt_y.index, self.data.num_insufficient_candidates
            );
        }

        self.maybe_do_step(false)
    }

    /// Add a new minorant.
    ///
    /// The minorant is added to the submodel as well as the master problem. The modified submodel
    /// may then lead to a potential null/descent-step.
    ///
    /// Return values
    ///   - `Ok(true)` if the termination criterion has been satisfied,
    ///   - `Ok(false)` if the termination criterion has not been satisfied,
    ///   - `Err(_)` on error.
    fn handle_new_minorant(&mut self, id: EvalId, mut minorant: P::Minorant) -> Result<bool, P, M> {
        debug!(
            "Receive minorant subproblem:{} candidate:{} current:{} center:{}",
            id.subproblem, id.candidate_index, self.data.nxt_y.index, self.data.cur_y.index,
        );

        // First check if the minorant needs to be extended.
        if let Some(ref sgext) = self.sgext {
            if minorant.dim() < self.data.cur_y.point.len() {
                sgext
                    .extend_subgradient(id.subproblem, &mut minorant)
                    .map_err(Error::SubgradientExtension)?;
            }
        }

        let accept_factor =
            self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
                / self.problem.num_subproblems().to_f64().unwrap();

        let sub = &mut self.data.subs[id.subproblem];
        let mut minorant = minorant;
        if let Some(point) = self.data.candidates.get_mut(id.candidate_index) {
            // center the minorant at 0
            minorant.move_center(-1.0, &point.point);

            // add minorant to submodel
            let old_can_val = sub.get_guess_value(&self.data.nxt_y).value;
            let old_cen_val = sub.get_lower_bound(&self.data.cur_y);
            sub.add_minorant(point, &Arc::new(minorant.clone()), accept_factor);
            let new_can_val = sub.get_guess_value(&self.data.nxt_y).value;
            let new_cen_val = sub.get_lower_bound(&self.data.cur_y);

            self.data.nxt_val += new_can_val - old_can_val;
            self.data.cur_val += new_cen_val - old_cen_val;

            // center the minorant at the current center
            minorant.move_center(1.0, &self.data.cur_y.point);
        } else {
            warn!("Ignore unknown candidate index:{}", id.candidate_index);
            return Ok(false);
        }

        // add minorant to master problem
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        master.add_minorant(id.subproblem, minorant)?;
        self.master_has_changed = true;

        self.maybe_do_step(false)
    }

    /// Handle a response `master_res` from the master problem process.
    ///
    /// The master response is the new candidate point. The method updates the
    /// algorithm state with the data of the new candidate (e.g. the model value
    /// `nxt_mod` in the point or the expected progress). Then it tests whether
    /// a termination criterion is satisfied. This is only the case if there is
    /// no pending problem update.
    ///
    /// Finally the master problem starts the evaluation of all subproblems at
    /// the new candidate.
    ///
    /// The new candidate is immediately checked for a potential new test.
    ///
    /// Return values
    ///   - `Ok(true)` if the termination criterion has been satisfied,
    ///   - `Ok(false)` if the termination criterion has not been satisfied,
    ///   - `Err(_)` on error.
    fn handle_master_response(&mut self, master_res: MasterResponse<P, M::MasterProblem>) -> Result<bool, P, M> {
        match master_res {
            Response::Error(err) => return Err(err.into()),
            Response::Result {
                nxt_mod,
                sgnorm,
                cnt_updates,
                nxt_d,
                center_index,
            } => {
                self.data.nxt_d = Arc::new(nxt_d);
                self.data.nxt_mod = nxt_mod;
                self.data.sgnorm = sgnorm;
                self.data.cnt_updates = cnt_updates;

                debug!(
                    "Master Response current_center:{} current_candidate:{} res_center:{} nxt_mod:{} sgnorm:{}",
                    self.data.cur_y.index, self.data.nxt_y.index, center_index, self.data.nxt_mod, self.data.sgnorm
                );
            }
        };

        self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;

        self.master_running = false;
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        // If this is the very first solution of the model,
        // we use its result as to make a good guess for the initial weight
        // of the proximal term and resolve.
        if self.data.cur_weight.is_infinite() {
            self.data.cur_weight = self.weighter.initial_weight(&self.data);
            master.set_weight(self.data.cur_weight)?;
            self.master_has_changed = true;
            self.update_candidate()?;
            return Ok(false);
        }

        // Compress bundle
        master.compress()?;

        // Check if new variables had been added. In this case, resize cur_y.
        match self.data.nxt_d.len().cmp(&self.data.cur_y.point.len()) {
            Ordering::Greater => {
                let nnew = self.data.nxt_d.len() - self.data.cur_y.point.len();
                Arc::make_mut(&mut self.data.cur_y.point).extend(repeat(Real::zero()).take(nnew));
            }
            Ordering::Less => {
                let nnew = self.data.cur_y.point.len() - self.data.nxt_d.len();
                Arc::make_mut(&mut self.data.nxt_d).extend(repeat(Real::zero()).take(nnew));
            }
            _ => (),
        }

        if self.data.sgnorm.is_zero() {
            return Ok(false);
        }

        // Compute new candidate.
        let mut next_y = dvec![];
        next_y.add(&self.data.cur_y.point, &self.data.nxt_d);
        #[cfg(debug_assertions)]
        {
            if self.data.nxt_y.point.len() == next_y.len() {
                let mut diff = self.data.nxt_y.point.as_ref().clone();
                diff.add_scaled(-1.0, &next_y);
                debug!("Candidates move distance:{}", diff.norm2());
            }
        }
        self.data.nxt_y.point = Arc::new(next_y);
        self.data.nxt_y.index += 1;

        // Add the new candidate to the list of candidates.
        debug_assert_eq!(self.data.nxt_y.index, self.data.candidates.len());
        self.data.candidates.push(self.data.nxt_y.clone());

        // Update the candidate in all submodels and
        // compute first guess value for new candidate.
        let accept_factor =
            self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
                / self.problem.num_subproblems().to_f64().unwrap();
        self.data.nxt_val = Real::zero();
        for sub in self.data.subs.iter_mut() {
            sub.update_candidate(&self.data.nxt_y, accept_factor);
            self.data.nxt_val += sub.get_guess_value(&self.data.nxt_y).value;
        }

        // Start evaluation of all subproblems at the new candidate.
        for i in 0..self.data.subs.len() {
            self.evaluate_subproblem(i)?;
        }

        self.maybe_do_step(true)
    }

    /// Do a descent or null step if precision is sufficient.
    ///
    /// Also checks if the termination criterion is satisfied.
    ///
    /// Return values
    ///   - `Ok(true)` if the termination criterion has been satisfied,
    ///   - `Ok(false)` if the termination criterion has not been satisfied,
    ///   - `Err(_)` on error.
    fn maybe_do_step(&mut self, check_termination: bool) -> Result<bool, P, M> {
        // No step if there is no real new candidate
        if self.data.nxt_y.index == self.data.cur_y.index {
            return Ok(false);
        }

        self.data.num_insufficient_candidates = self.data.subs.iter().filter(|s| !s.is_close_enough).count();

        let nxt_y = &self.data.nxt_y;

        // A synchronized step requires exact candidates.
        if self.data.force_synchronized {
            let num_exact = self
                .data
                .subs
                .iter_mut()
                .map(|s| s.get_guess_value(nxt_y).is_exact())
                .filter(|&is_exact| is_exact)
                .count();
            if num_exact < self.data.subs.len() {
                // Inexact candidate -> do not compute a new candidate but wait
                // for all evaluations.
                return Ok(false);
            }
        }

        let num_exact = self
            .data
            .subs
            .iter_mut()
            .map(|s| s.get_guess_value(nxt_y).is_exact())
            .filter(|&is_exact| is_exact)
            .count();

        let sum_dist = self
            .data
            .subs
            .iter_mut()
            .map(|s| s.get_guess_value(nxt_y).dist)
            .sum::<Real>();

        // TODO: maybe we should wait for *minorants* to be exact in the
        // candidate, so that the guess-value and the lower bound in the
        // candidate are guaranteed to be equal? (This might be relevant for the
        // termination test ... I'm not sure about this right now ...)
        debug!(
            "Number of insufficient subproblems: {} num exact: {} sum dist: {}",
            self.data.num_insufficient_candidates, num_exact, sum_dist,
        );

        if log_enabled!(Debug) {
            let accept_factor =
                self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
                    / self.problem.num_subproblems().to_f64().unwrap();
            for (fidx, sub) in self.data.subs.iter_mut().enumerate() {
                debug!(
                    "  {}sufficient fidx:{} dist:{} required:{}",
                    if sub.is_close_enough { "  " } else { "in" },
                    fidx,
                    sub.get_guess_value(nxt_y).dist,
                    if sub.l_guess.is_zero() {
                        Real::infinity()
                    } else {
                        accept_factor / sub.l_guess
                    }
                );
            }
        }

        // test if we should terminate
        if check_termination && self.terminator.terminate(&self.data) && self.data.cnt_descent > 2 {
            // Terminate only with exact center.
            if self.data.num_inexact_center.is_zero() {
                self.show_info(Step::Term);

                if self.data.update_in_progress || self.data.need_update {
                    info!("Termination criterion satisfied, waiting for pending update");
                    self.data.pending_termination = true;
                } else {
                    info!("Termination criterion satisfied");
                    return Ok(true);
                }
            } else {
                // The termination criterion has been satisfied, but the current
                // center evaluations are not exact. We force the current
                // *candidate* evaluations to be exact and do a forced descent step.
                // This causes the next center to be exact and hopefully satisfying
                // the termination criterion, too.
                let num_inexact = self.data.subs.len() - num_exact;

                debug!(
                    "Termination criterion satisfied with {} inexact evaluations",
                    num_inexact
                );

                // Inexact candidates -> force a synchronized step.
                if num_inexact > 0 {
                    debug!("Switch to synchronized mode");
                    self.data.force_synchronized = true;
                    return Ok(false);
                }
            }
        }

        // Do a regular, maybe inexact, step.
        if self.data.num_insufficient_candidates == 0 {
            self.do_step()
        } else {
            Ok(false)
        }
    }

    /// Do a null-step or descent-step based on the current candidate data.
    fn do_step(&mut self) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        // In a forced synchronized step, any descent is accepted ...
        let descent_bnd = if !self.data.force_synchronized {
            Self::get_descent_bound(self.params.acceptance_factor, &self.data)
        } else {
            self.data.cur_val
        };

        debug!(
            "Try step from center:{} to candidate:{}",
            self.data.cur_y.index, self.data.nxt_y.index
        );

        // Test whether we do a descent step
        if self.data.nxt_val <= descent_bnd {
            debug!("Descent Step");
            debug!("  cur_val    ={}", self.data.cur_val);
            debug!("  nxt_mod    ={}", self.data.nxt_mod);
            debug!("  nxt_ub     ={}", self.data.nxt_val);
            debug!("  descent_bnd={}", descent_bnd);

            // After a successful descent step, switch back to non-synchronized
            if self.data.force_synchronized {
                debug!("Switch to unsynchronized mode");
            }
            self.data.force_synchronized = false;
            self.data.cnt_descent += 1;
            self.data.cur_y = self.data.nxt_y.clone();
            let cur_y = &self.data.cur_y;
            self.data.num_inexact_center = self
                .data
                .subs
                .iter_mut()
                .map(|s| !s.get_guess_value(cur_y).is_exact())
                .filter(|&is_exact| is_exact)
                .count();

            // Note that we must update the weight *before* we
            // change the internal data, so the old information
            // that caused the descent step is still available.
            self.data.cur_weight = self.weighter.descent_weight(&self.data);
            // The new value in the center is the model value in the candidate.
            // In particular, it is a lower bound on the real function value.
            self.data.cur_val = self
                .data
                .subs
                .iter_mut()
                .map(|s| s.get_lower_bound(cur_y))
                .sum::<Real>();

            // Check if the progress of the last decent step was large enough
            // when using the lower bound in the center instead of the former
            // guess value.
            let error = self.data.subs.iter_mut().map(SubData::error_estimate).sum::<Real>();
            let update_l_guess = error > self.data.error_bound;

            // save new error bound
            self.data.error_bound = self.data.expected_progress * self.params.acceptance_factor;

            // Move all subproblems.
            self.data.nxt_val = Real::zero();
            for sub in &mut self.data.subs {
                sub.move_center(
                    &self.data.cur_y,
                    if update_l_guess {
                        UpdateL::Increase(self.params.l_guess_factor)
                    } else {
                        UpdateL::Decrease(self.params.l_guess_factor * 0.5)
                    },
                );
                self.data.nxt_val += sub.get_guess_value(&self.data.nxt_y).value;
            }
            self.data.need_update = true;

            master.move_center(1.0, self.data.nxt_d.clone(), self.data.cur_y.index)?;
            master.set_weight(self.data.cur_weight)?;
            self.master_has_changed = true;

            self.show_info(Step::Descent);
            self.update_problem(Step::Descent)?;

            // We need a new candidate.
            self.update_candidate()?;
            Ok(self.data.cnt_descent >= self.data.max_iter)
        } else {
            debug!("Null Step nxt_val:{} descent_bnd:{}", self.data.nxt_val, descent_bnd);
            // No descent-step, so this is declared a null step
            self.data.cur_weight = self.weighter.null_weight(&self.data);
            self.show_info(Step::Null);
            self.update_problem(Step::Null)?;

            // After a null step we need a new candidate, too.
            self.update_candidate()?;
            Ok(false)
        }
    }

    /// Start evaluation of a subproblem if it is not running.
    ///
    /// The evaluation is started at the current candidate. The candidate
    /// is added to the subproblem's candidate list.
    ///
    /// Returns `true` iff a new evaluations has been started.
    fn evaluate_subproblem(&mut self, subproblem: usize) -> Result<bool, P, M> {
        let sub = &mut self.data.subs[subproblem];
        if !sub.is_running && sub.last_eval_index < self.data.nxt_y.index {
            sub.is_running = true;
            sub.last_eval_index = sub.last_eval_index.max(self.data.nxt_y.index);
            self.problem
                .evaluate(
                    subproblem,
                    self.data.nxt_y.point.clone(),
                    ChannelResultSender::new(
                        EvalId {
                            subproblem,
                            candidate_index: self.data.nxt_y.index,
                        },
                        self.client_tx.as_ref().ok_or(Error::NotInitialized)?.clone(),
                    ),
                )
                .map_err(Error::Evaluation)?;
            sub.cnt_evals += 1;
            debug!("Evaluate fidx:{} candidate:{}", subproblem, self.data.nxt_y.index);
            Ok(true)
        } else {
            assert!(sub.is_running || sub.is_close_enough);
            Ok(false)
        }
    }

    /// Possibly start a new master process computation.
    fn update_candidate(&mut self) -> Result<(), P, M> {
        if !self.master_running && self.master_has_changed {
            debug!("Start master problem");
            self.master_running = true;
            self.master_has_changed = false;
            self.master_proc
                .as_mut()
                .ok_or(Error::NotInitialized)?
                .solve(self.data.cur_val)?;
        }
        Ok(())
    }

    /// Update the problem after descent or null `step`.
    ///
    /// After a successful descent or null step the problem gets a chance to
    /// update, i.e. to change some problem properties like adding new
    /// variables. This method starts the update process.
    ///
    /// Return values
    ///   - `Ok(true)` if a new update process has been started,
    ///   - `Ok(false)` if there is already a running update process (only one
    ///     is allowed at the same time),
    ///   - `Err(_)` on error.
    fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
        // only one update may be running at the same time
        if self.data.update_in_progress {
            return Ok(false);
        }

        // Ok, we are doing a new update now ...
        self.data.need_update = false;

        let master_proc = self.master_proc.as_mut().unwrap();
        self.problem
            .update(
                UpdateData {
                    cur_y: self.data.cur_y.point.clone(),
                    nxt_y: self.data.nxt_y.point.clone(),
                    step,
                    primal_aggrs: (0..self.problem.num_subproblems())
                        .map(|i| master_proc.get_aggregated_primal(i))
                        .map(|p| p.map_err(Error::from))
                        .collect::<std::result::Result<Vec<_>, _>>()?,
                },
                ChannelUpdateSender::new(
                    EvalId {
                        subproblem: 0,
                        candidate_index: self.data.cur_y.index,
                    },
                    self.client_tx.clone().ok_or(Error::NotInitialized)?,
                ),
            )
            .map_err(Error::Update)?;
        self.data.update_in_progress = true;
        Ok(true)
    }

    /// Handles an update response `update` of the problem.
    ///
    /// The method is called if the problem informs the solver about a change of
    /// the problem, e.g. adding a new variable. This method informs the other
    /// parts of the solver, e.g. the master problem, about the modification.
    ///
    /// The method returns `Ok(true)` if the master problem has been modified.
    fn handle_update_response(&mut self, update: Update<EvalId, P::Minorant, P::Err>) -> Result<UpdateResult, P, M> {
        let mut modified = false;
        match update {
            Update::AddVariables { bounds, sgext, .. } => {
                if !bounds.is_empty() {
                    eprintln!("Add variables {}", bounds.len());
                    let sgext: Arc<dyn SubgradientExtender<_, _>> = sgext.into();
                    // add new variables
                    let nnew = bounds.len();
                    self.master_proc
                        .as_mut()
                        .unwrap()
                        .add_vars(bounds.into_iter().map(|(l, u)| (l, u)).collect(), sgext.clone())?;
                    self.master_has_changed = true;
                    for sub in &mut self.data.subs {
                        sub.add_vars(nnew, sgext.as_ref())
                            .map_err(Error::SubgradientExtension)?;
                    }
                    self.sgext = Some(sgext);
                    let n = self.data.cur_y.point.len() + nnew;
                    Arc::make_mut(&mut self.data.cur_y.point).resize(n, Real::zero());
                    Arc::make_mut(&mut self.data.nxt_y.point).resize(n, Real::zero());
                    for c in &mut self.data.candidates {
                        Arc::make_mut(&mut c.point).resize(n, Real::zero());
                    }
                    modified = true;
                    if self.data.pending_termination {
                        info!("Cancel pending termination due to problem update");
                        self.data.pending_termination = false;
                    }
                }
            }
            Update::Done { .. } => {
                // currently we do nothing ...
                self.data.update_in_progress = false;
                // Pending termination?
                if self.data.pending_termination {
                    info!("No update after termination criterion has been satisfied, so terminate");
                    return Ok(UpdateResult::Term);
                }
            }
            Update::Error { err, .. } => {
                self.data.update_in_progress = false;
                return Err(Error::Update(err));
            }
        }

        Ok(if modified {
            UpdateResult::Modified
        } else {
            UpdateResult::Unmodified
        })
    }

    /// Return the bound the function value must be below of to enforce a descent step.
    ///
    /// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
    /// bundle method will perform a descent step.
    ///
    /// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
    /// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
    /// progress and $\varrho$ is the `acceptance_factor`.
    fn get_descent_bound(acceptance_factor: Real, data: &SolverData<P::Minorant, P::Err>) -> Real {
        data.cur_val - acceptance_factor * (data.cur_val - data.nxt_mod)
    }

    /// Log some information about the latest `step`.
    fn show_info(&self, step: Step) {
        let time = self.start_time.elapsed();
        info!(
            "{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4} {:4}{:1}  {:9.4} {:9.4}({:9.4}) \
             {:12.6}({:12.6}) {:12.6}({:12.6})",
            if step == Step::Term { "_endit" } else { "endit " },
            time.as_secs() / 3600,
            (time.as_secs() / 60) % 60,
            time.as_secs() % 60,
            time.subsec_nanos() / 10_000_000,
            self.data.cnt_descent,
            self.data.cnt_updates,
            self.data.subs.iter().map(|s| s.cnt_evals).sum::<usize>(),
            if step == Step::Descent { "*" } else { " " },
            PrettyPrintFloat(self.data.cur_weight),
            PrettyPrintFloat(self.data.expected_progress()),
            PrettyPrintFloat(self.data.cur_val - self.data.nxt_val),
            PrettyPrintFloat(self.data.nxt_mod),
            PrettyPrintFloat(self.data.nxt_val),
            PrettyPrintFloat(self.data.cur_val),
            PrettyPrintFloat(self.data.subs.iter().map(|s| s.center_guess_value()).sum::<Real>()),
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<<P::Minorant as Minorant>::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}