RsBundle  Artifact [200ef894e4]

Artifact 200ef894e42aef8e8243e74a7aa797a93ab33103:

  • File src/parallel/solver.rs — part of check-in [8dc408e351] at 2019-07-20 12:20:53 on branch weighter — Implement new generic weigther interface (user: fifr size: 26308) [more...]

/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous parallel bundle solver.

use crossbeam::channel::{select, unbounded as channel, Receiver, Sender};
use log::{debug, info, warn};
use num_cpus;
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Minorant, Real};

use super::problem::{EvalResult, FirstOrderProblem};
use crate::master::{BoxedMasterProblem, CplexMaster, MasterProblem, UnconstrainedMasterProblem};
use crate::solver::{BundleState, SolverParams, StandardTerminator, Step, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

type MasterProblemError = <BoxedMasterProblem<CplexMaster> as MasterProblem>::Err;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
    /// An error raised by the master problem process.
    Master(MasterProblemError),
    /// The iteration limit has been reached.
    IterationLimit { limit: usize },
    /// An error raised by a subproblem evaluation.
    Evaluation(E),
    /// The dimension of some data is wrong.
    Dimension(String),
    /// An error occurred in a subprocess.
    Process(Box<dyn std::error::Error>),
    /// A method requiring an initialized solver has been called.
    NotInitialized,
}

impl<E> std::fmt::Display for Error<E>
where
    E: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
        use Error::*;
        match self {
            Master(err) => writeln!(fmt, "Error in master problem: {}", err),
            IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
            Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
            Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
            Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
            NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
        }
    }
}

impl<E> std::error::Error for Error<E>
where
    E: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            Master(err) => Some(err),
            Evaluation(err) => Some(err),
            Process(err) => Some(err.as_ref()),
            _ => None,
        }
    }
}

/// Information about a minorant.
#[derive(Debug, Clone)]
struct MinorantInfo<Pr> {
    /// The minorant's index in the master problem
    index: usize,
    /// Current multiplier.
    multiplier: Real,
    /// Primal associated with this minorant.
    primal: Option<Pr>,
}

/// Configuration information for setting up a master problem.
struct MasterConfig {
    /// The number of subproblems.
    num_subproblems: usize,
    /// The number of variables.
    num_vars: usize,
    /// The lower bounds on the variables.
    lower_bounds: Option<DVector>,
    /// The lower bounds on the variables.
    upper_bounds: Option<DVector>,
    /// The maximal number of inner updates.
    max_updates: usize,
}

/// A task for the master problem.
enum MasterTask<Pr> {
    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Minorant, Pr),

    /// Move the center of the master problem in the given direction.
    MoveCenter(Real, Arc<DVector>),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },

    /// Compress the bundle.
    Compress,

    /// Set the weight parameter of the master problem.
    SetWeight { weight: Real },
}

/// The response send from a master process.
///
/// The response contains the evaluation results of the latest
struct MasterResponse {
    nxt_d: DVector,
    nxt_mod: Real,
    sgnorm: Real,
}

type MasterSender = Sender<std::result::Result<MasterResponse, MasterProblemError>>;

type MasterReceiver<Pr> = Receiver<MasterTask<Pr>>;

type ClientSender<P> =
    Sender<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;

type ClientReceiver<P> =
    Receiver<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;

/// Parameters for tuning the solver.
pub type Parameters = SolverParams;

pub struct SolverData {
    /// Current center of stability.
    cur_y: DVector,

    /// Function value in the current point.
    cur_val: Real,

    /// Function value at the current candidate.
    nxt_val: Real,

    /// Model value at the current candidate.
    nxt_mod: Real,

    /// The value of the new minorant in the current center.
    new_cutval: Real,

    /// Norm of current aggregated subgradient.
    sgnorm: Real,

    /// The currently used master problem weight.
    cur_weight: Real,
}

impl HKWeightable for SolverData {
    fn current_weight(&self) -> Real {
        self.cur_weight
    }

    fn center(&self) -> &DVector {
        &self.cur_y
    }

    fn center_value(&self) -> Real {
        self.cur_val
    }

    fn candidate_value(&self) -> Real {
        self.nxt_val
    }

    fn candidate_model(&self) -> Real {
        self.nxt_mod
    }

    fn new_cutvalue(&self) -> Real {
        self.new_cutval
    }

    fn sgnorm(&self) -> Real {
        self.sgnorm
    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter>
where
    P: FirstOrderProblem,
    T: Terminator,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,

    /// Weighter heuristic.
    pub weighter: W,

    /// The first order problem.
    problem: P,

    /// The algorithm data.
    data: SolverData,

    /// The threadpool of the solver.
    threadpool: ThreadPool,

    /// The channel to transmit new tasks to the master problem.
    master_tx: Option<Sender<MasterTask<P::Primal>>>,

    /// The channel to receive solutions from the master problem.
    master_rx: Option<Receiver<std::result::Result<MasterResponse, MasterProblemError>>>,

    /// The channel to receive the evaluation results from subproblems.
    client_tx: Option<ClientSender<P>>,

    /// The channel to receive the evaluation results from subproblems.
    client_rx: Option<ClientReceiver<P>>,

    /// Number of descent steps.
    cnt_descent: usize,

    /// Number of null steps.
    cnt_null: usize,

    /// Number of function evaluation.
    cnt_evals: usize,

    /// Time when the solution process started.
    ///
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W> Solver<P, T, W>
where
    P: FirstOrderProblem,
    P::Primal: Send + Sync + 'static,
    P::Err: std::error::Error + Send + Sync + 'static,
    T: Terminator + Default,
    W: Weighter<SolverData> + Default,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self {
        Solver {
            params: Parameters::default(),
            terminator: Default::default(),
            weighter: Default::default(),
            problem,
            data: SolverData {
                cur_y: dvec![],
                cur_val: 0.0,
                nxt_val: 0.0,
                nxt_mod: 0.0,
                new_cutval: 0.0,
                sgnorm: 0.0,
                cur_weight: 1.0,
            },

            threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), num_cpus::get()),
            master_tx: None,
            master_rx: None,
            client_tx: None,
            client_rx: None,

            cnt_descent: 0,
            cnt_null: 0,
            cnt_evals: 0,

            start_time: Instant::now(),
        }
    }

    /// Return the underlying threadpool.
    ///
    /// In order to use the same threadpool for concurrent processes,
    /// just clone the returned `ThreadPool`.
    pub fn threadpool(&self) -> &ThreadPool {
        &self.threadpool
    }

    /// Set the threadpool.
    ///
    /// This function allows to use a specific threadpool for all processes
    /// spawned by the solver. Note that this does not involve any threads
    /// used by the problem because the solver is not responsible for executing
    /// the evaluation process of the subproblems. However, the problem might
    /// use the same threadpool as the solver.
    pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
        self.threadpool = threadpool;
    }

    /// Initialize the solver.
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`solve`].
    pub fn init(&mut self) -> Result<(), Error<P::Err>> {
        debug!("Initialize solver");

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        self.data.cur_y.init0(n);
        self.cnt_descent = 0;
        self.cnt_null = 0;
        self.cnt_evals = 0;

        let (tx, rx) = channel();
        self.client_tx = Some(tx);
        self.client_rx = Some(rx);

        let (tx, rx) = channel();
        let (rev_tx, rev_rx) = channel();

        self.master_tx = Some(tx);
        self.master_rx = Some(rev_rx);

        let master_config = MasterConfig {
            num_subproblems: m,
            num_vars: n,
            lower_bounds: self.problem.lower_bounds().map(DVector),
            upper_bounds: self.problem.upper_bounds().map(DVector),
            max_updates: self.params.max_updates,
        };

        if master_config
            .lower_bounds
            .as_ref()
            .map(|lb| lb.len() != n)
            .unwrap_or(false)
        {
            return Err(Error::Dimension("lower bounds".to_string()));
        }
        if master_config
            .upper_bounds
            .as_ref()
            .map(|ub| ub.len() != n)
            .unwrap_or(false)
        {
            return Err(Error::Dimension("upper bounds".to_string()));
        }

        debug!("Start master process");
        self.threadpool.execute(move || {
            debug!("Master process started");
            let mut rev_tx = rev_tx;
            if let Err(err) = Self::master_main(master_config, &mut rev_tx, rx) {
                #[allow(unused_must_use)]
                {
                    rev_tx.send(Err(err));
                }
            }
            debug!("Master proces stopped");
        });

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
                .map_err(Error::Evaluation)?;
        }

        let mut have_minorants = vec![false; m];
        let mut center_values: Vec<Option<Real>> = vec![None; m];
        let mut cnt_remaining_objs = m;
        let mut cnt_remaining_mins = m;
        let master_tx = self.master_tx.as_ref().unwrap();
        for m in self.client_rx.as_ref().unwrap() {
            match m {
                Ok(EvalResult::ObjectiveValue { index: i, value }) => {
                    debug!("Receive objective from subproblem {}: {}", i, value);
                    if center_values[i].is_none() {
                        cnt_remaining_objs -= 1;
                        center_values[i] = Some(value);
                    }
                }
                Ok(EvalResult::Minorant {
                    index: i,
                    minorant,
                    primal,
                }) => {
                    debug!("Receive minorant from subproblem {}", i);
                    master_tx
                        .send(MasterTask::AddMinorant(i, minorant, primal))
                        .map_err(|err| Error::Process(err.into()))?;
                    if !have_minorants[i] {
                        have_minorants[i] = true;
                        cnt_remaining_mins -= 1;
                    }
                }
                Err(err) => return Err(Error::Evaluation(err)),
            };
            if cnt_remaining_mins == 0 && cnt_remaining_objs == 0 {
                break;
            }
        }

        self.data.cur_weight = Real::infinity(); // gets initialized when the master problem is complete
        master_tx
            .send(MasterTask::SetWeight { weight: 1.0 })
            .map_err(|err| Error::Process(err.into()))?;
        master_tx
            .send(MasterTask::Solve {
                center_value: self.data.cur_val,
            })
            .map_err(|err| Error::Process(err.into()))?;

        debug!("Initialization complete");

        self.start_time = Instant::now();

        Ok(())
    }

    fn master_main(
        master_config: MasterConfig,
        tx: &mut MasterSender,
        rx: MasterReceiver<P::Primal>,
    ) -> std::result::Result<(), MasterProblemError> {
        let mut master = CplexMaster::new().map(BoxedMasterProblem::with_master)?;
        let mut minorants: Vec<MinorantInfo<P::Primal>> = vec![];

        // Initialize the master problem.
        master.set_num_subproblems(master_config.num_subproblems)?;
        master.set_vars(
            master_config.num_vars,
            master_config.lower_bounds,
            master_config.upper_bounds,
        )?;
        master.set_max_updates(master_config.max_updates)?;

        // The main iteration: wait for new tasks.
        for m in rx {
            match m {
                MasterTask::AddMinorant(i, m, primal) => {
                    debug!("master: add minorant to subproblem {}", i);
                    let index = master.add_minorant(i, m)?;
                    minorants.push(MinorantInfo {
                        index,
                        multiplier: 0.0,
                        primal: Some(primal),
                    });
                }
                MasterTask::MoveCenter(alpha, d) => {
                    debug!("master: move center");
                    master.move_center(alpha, &d);
                }
                MasterTask::Compress => {
                    debug!("Compress bundle");
                    warn!("Bundle compression not yet implemented");
                }
                MasterTask::Solve { center_value } => {
                    debug!("master: solve with center_value {}", center_value);
                    master.solve(center_value)?;
                    let master_response = MasterResponse {
                        nxt_d: master.get_primopt(),
                        nxt_mod: master.get_primoptval(),
                        sgnorm: master.get_dualoptnorm2().sqrt(),
                    };
                    if let Err(err) = tx.send(Ok(master_response)) {
                        warn!("Master process cancelled because of channel error: {}", err);
                        break;
                    }
                }
                MasterTask::SetWeight { weight } => {
                    debug!("master: set weight {}", weight);
                    master.set_weight(weight)?;
                }
            };
        }

        Ok(())
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
        debug!("Start solving up to {} iterations", niter);
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        let client_rx = self.client_rx.as_ref().ok_or(Error::NotInitialized)?;
        let master_tx = self.master_tx.as_ref().ok_or(Error::NotInitialized)?;
        let master_rx = self.master_rx.as_ref().ok_or(Error::NotInitialized)?;

        let mut cnt_iter = 0;
        let mut nxt_ubs = vec![Real::infinity(); self.problem.num_subproblems()];
        let mut cnt_remaining_ubs = self.problem.num_subproblems();
        let mut nxt_cutvals = vec![-Real::infinity(); self.problem.num_subproblems()];
        let mut cnt_remaining_mins = self.problem.num_subproblems();
        let mut nxt_d = Arc::new(dvec![]);
        let mut nxt_y = Arc::new(dvec![]);
        let mut expected_progress = 0.0;

        loop {
            select! {
                recv(client_rx) -> msg => {
                    let msg = msg
                        .map_err(|err| Error::Process(err.into()))?
                        .map_err(Error::Evaluation)?;
                    match msg {
                        EvalResult::ObjectiveValue { index, value } => {
                                debug!("Receive objective from subproblem {}: {}", index, value);
                            if nxt_ubs[index].is_infinite() {
                                cnt_remaining_ubs -= 1;
                            }
                            nxt_ubs[index] = nxt_ubs[index].min(value);
                        }
                        EvalResult::Minorant { index, mut minorant, primal } => {
                            debug!("Receive minorant from subproblem {}", index);
                            if nxt_cutvals[index].is_infinite() {
                                cnt_remaining_mins -= 1;
                            }
                            // move center of minorant to cur_y
                            minorant.move_center(-1.0, &nxt_d);
                            nxt_cutvals[index] = nxt_cutvals[index].max(minorant.constant);
                            // add minorant to master problem
                            master_tx
                                .send(MasterTask::AddMinorant(index, minorant, primal))
                                .map_err(|err| Error::Process(err.into()))?;
                        }
                    }

                    if cnt_remaining_ubs == 0 && cnt_remaining_mins == 0 {
                        // All subproblems have been evaluated, do a step.

                        let nxt_ub = nxt_ubs.iter().sum::<Real>();
                        let descent_bnd = self.get_descent_bound();

                        self.data.nxt_val = nxt_ub;
                        self.data.new_cutval = nxt_cutvals.iter().sum::<Real>();

                        debug!("Step");
                        debug!("  cur_val    ={}", self.data.cur_val);
                        debug!("  nxt_mod    ={}", self.data.nxt_mod);
                        debug!("  nxt_ub     ={}", nxt_ub);
                        debug!("  descent_bnd={}", descent_bnd);


                        let step;
                        if nxt_ub <= descent_bnd {
                            step = Step::Descent;
                            self.cnt_descent += 1;

                            self.data.cur_y = nxt_y.as_ref().clone();
                            self.data.cur_val = nxt_ub;

                            master_tx
                                .send(MasterTask::MoveCenter(1.0, nxt_d.clone()))
                                .map_err(|err| Error::Process(err.into()))?;

                            debug!("Descent Step");
                            debug!("  dir ={}", nxt_d);
                            debug!("  newy={}", self.data.cur_y);
                            self.data.cur_weight = self.weighter.descent_weight(&self.data);
                        } else {
                            step = Step::Null;
                            self.cnt_null += 1;
                            self.data.cur_weight = self.weighter.null_weight(&self.data);
                        }

                        master_tx
                            .send(MasterTask::SetWeight { weight: self.data.cur_weight })
                            .map_err(|err| Error::Process(err.into()))?;

                        self.show_info(step, expected_progress, self.data.nxt_mod, nxt_ub, self.data.cur_val);
                        cnt_iter += 1;
                        if cnt_iter >= niter { break }

                        master_tx
                            .send(MasterTask::Solve { center_value: self.data.cur_val })
                            .map_err(|err| Error::Process(err.into()))?;
                    }
                },
                recv(master_rx) -> msg => {
                    debug!("Receive master response");
                    // Receive result (new candidate) from the master
                    let master_res = msg
                        .map_err(|err| Error::Process(err.into()))?
                        .map_err(Error::Master)?;

                    if self.data.cur_weight < Real::infinity() && self.terminator.terminate(&BundleState {
                        cur_y: &self.data.cur_y,
                        cur_val: self.data.cur_val,
                        nxt_y: &nxt_y,
                        nxt_mod: self.data.nxt_mod,
                        nxt_val: self.data.cur_val,    // hopefully does not matter
                        new_cutval: self.data.cur_val, // hopefully does not matter
                        sgnorm: self.data.sgnorm,
                        weight: self.data.cur_weight,
                        step: Step::Term,
                        expected_progress,
                    }, &self.params) {
                        info!("Termination criterion satisfied");
                        return Ok(true)
                    }

                    // Compress bundle
                    master_tx.send(MasterTask::Compress).map_err(|err| Error::Process(err.into()))?;

                    // Compute new candidate.
                    self.data.nxt_mod = master_res.nxt_mod;
                    expected_progress = self.data.cur_val - self.data.nxt_mod;
                    self.data.sgnorm = master_res.sgnorm;
                    let mut next_y = dvec![];
                    nxt_d = Arc::new(master_res.nxt_d);
                    next_y.add(&self.data.cur_y, &nxt_d);
                    nxt_y = Arc::new(next_y);

                    // Reset evaluation data.
                    nxt_ubs.clear();
                    nxt_ubs.resize(self.problem.num_subproblems(), Real::infinity());
                    cnt_remaining_ubs = self.problem.num_subproblems();
                    nxt_cutvals.clear();
                    nxt_cutvals.resize(self.problem.num_subproblems(), -Real::infinity());
                    cnt_remaining_mins = self.problem.num_subproblems();

                    // Start evaluation of all subproblems at the new candidate.
                    for i in 0..self.problem.num_subproblems() {
                        self.problem.evaluate(i, nxt_y.clone(), i, client_tx.clone()) .map_err(Error::Evaluation)?;
                    }

                    // Compute the real initial weight.
                    if self.data.cur_weight.is_infinite() {
                        let weight = self.weighter.initial_weight(&self.data);
                        self.data.cur_weight = weight;
                        master_tx
                            .send(MasterTask::SetWeight { weight })
                            .map_err(|err| Error::Process(err.into()))?;
                    }

                },
            }
        }

        Ok(false)
    }

    /// Return the bound the function value must be below of to enforce a descent step.
    ///
    /// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
    /// bundle method will perform a descent step.
    ///
    /// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
    /// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
    /// progress and $\varrho$ is the `acceptance_factor`.
    fn get_descent_bound(&self) -> Real {
        self.data.cur_val - self.params.acceptance_factor * (self.data.cur_val - self.data.nxt_mod)
    }

    fn show_info(&self, step: Step, expected_progress: Real, nxt_mod: Real, nxt_val: Real, cur_val: Real) {
        let time = self.start_time.elapsed();
        info!(
            "{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4} {:4}{:1}  {:9.4} {:9.4} \
             {:12.6e}({:12.6e}) {:12.6e}",
            if step == Step::Term { "_endit" } else { "endit " },
            time.as_secs() / 3600,
            (time.as_secs() / 60) % 60,
            time.as_secs() % 60,
            time.subsec_nanos() / 10_000_000,
            self.cnt_descent,
            self.cnt_descent + self.cnt_null,
            0, /*self.master.cnt_updates(),*/
            if step == Step::Descent { "*" } else { " " },
            self.data.cur_weight,
            expected_progress,
            nxt_mod,
            nxt_val,
            cur_val
        );
    }
}