/*
* Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
use crossbeam::channel::{select, unbounded as channel, Receiver, Sender};
use log::{debug, info};
use num_cpus;
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::masterprocess::{MasterConfig, MasterProcess};
use super::problem::{EvalResult, FirstOrderProblem};
use crate::master::{BoxedMasterProblem, CplexMaster, MasterProblem as MP};
use crate::solver::{SolverParams, Step};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
type MasterProblem = BoxedMasterProblem<CplexMaster>;
type MasterProblemError = <MasterProblem as MP>::Err;
/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
/// An error raised by the master problem process.
Master(MasterProblemError),
/// The iteration limit has been reached.
IterationLimit { limit: usize },
/// An error raised by a subproblem evaluation.
Evaluation(E),
/// The dimension of some data is wrong.
Dimension(String),
/// An error occurred in a subprocess.
Process(Box<dyn std::error::Error>),
/// A method requiring an initialized solver has been called.
NotInitialized,
}
impl<E> std::fmt::Display for Error<E>
where
E: std::fmt::Display,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
use Error::*;
match self {
Master(err) => writeln!(fmt, "Error in master problem: {}", err),
IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
}
}
}
impl<E> std::error::Error for Error<E>
where
E: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
Master(err) => Some(err),
Evaluation(err) => Some(err),
Process(err) => Some(err.as_ref()),
_ => None,
}
}
}
type ClientSender<P> =
Sender<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;
type ClientReceiver<P> =
Receiver<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;
/// Parameters for tuning the solver.
pub type Parameters = SolverParams;
pub struct SolverData {
/// Current center of stability.
cur_y: DVector,
/// Function value in the current point.
cur_val: Real,
/// Function value at the current candidate.
nxt_val: Real,
/// Model value at the current candidate.
nxt_mod: Real,
/// The value of the new minorant in the current center.
new_cutval: Real,
/// Norm of current aggregated subgradient.
sgnorm: Real,
/// The currently used master problem weight.
cur_weight: Real,
}
impl StandardTerminatable for SolverData {
fn center_value(&self) -> Real {
self.cur_val
}
fn expected_progress(&self) -> Real {
self.cur_val - self.nxt_mod
}
}
impl HKWeightable for SolverData {
fn current_weight(&self) -> Real {
self.cur_weight
}
fn center(&self) -> &DVector {
&self.cur_y
}
fn center_value(&self) -> Real {
self.cur_val
}
fn candidate_value(&self) -> Real {
self.nxt_val
}
fn candidate_model(&self) -> Real {
self.nxt_mod
}
fn new_cutvalue(&self) -> Real {
self.new_cutval
}
fn sgnorm(&self) -> Real {
self.sgnorm
}
}
/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter>
where
P: FirstOrderProblem,
{
/// Parameters for the solver.
pub params: Parameters,
/// Termination predicate.
pub terminator: T,
/// Weighter heuristic.
pub weighter: W,
/// The threadpool of the solver.
pub threadpool: ThreadPool,
/// The first order problem.
problem: P,
/// The algorithm data.
data: SolverData,
/// The master problem process.
master: Option<MasterProcess<P, MasterProblem>>,
/// The channel to receive the evaluation results from subproblems.
client_tx: Option<ClientSender<P>>,
/// The channel to receive the evaluation results from subproblems.
client_rx: Option<ClientReceiver<P>>,
/// Number of descent steps.
cnt_descent: usize,
/// Number of null steps.
cnt_null: usize,
/// Number of function evaluation.
cnt_evals: usize,
/// Time when the solution process started.
///
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W> Solver<P, T, W>
where
P: FirstOrderProblem,
P::Primal: Send + Sync + 'static,
P::Err: std::error::Error + Send + Sync + 'static,
T: Terminator<SolverData> + Default,
W: Weighter<SolverData> + Default,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self {
Solver {
params: Parameters::default(),
terminator: Default::default(),
weighter: Default::default(),
problem,
data: SolverData {
cur_y: dvec![],
cur_val: 0.0,
nxt_val: 0.0,
nxt_mod: 0.0,
new_cutval: 0.0,
sgnorm: 0.0,
cur_weight: 1.0,
},
threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), num_cpus::get()),
master: None,
client_tx: None,
client_rx: None,
cnt_descent: 0,
cnt_null: 0,
cnt_evals: 0,
start_time: Instant::now(),
}
}
/// Return the underlying threadpool.
///
/// In order to use the same threadpool for concurrent processes,
/// just clone the returned `ThreadPool`.
pub fn threadpool(&self) -> &ThreadPool {
&self.threadpool
}
/// Set the threadpool.
///
/// This function allows to use a specific threadpool for all processes
/// spawned by the solver. Note that this does not involve any threads
/// used by the problem because the solver is not responsible for executing
/// the evaluation process of the subproblems. However, the problem might
/// use the same threadpool as the solver.
pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
self.threadpool = threadpool;
}
/// Initialize the solver.
///
/// This will reset the internal data structures so that a new fresh
/// solution process can be started.
///
/// It will also setup all worker processes.
///
/// This function is automatically called by [`solve`].
pub fn init(&mut self) -> Result<(), Error<P::Err>> {
debug!("Initialize solver");
let n = self.problem.num_variables();
let m = self.problem.num_subproblems();
self.data.cur_y.init0(n);
self.cnt_descent = 0;
self.cnt_null = 0;
self.cnt_evals = 0;
let (tx, rx) = channel();
self.client_tx = Some(tx);
self.client_rx = Some(rx);
let master_config = MasterConfig {
num_subproblems: m,
num_vars: n,
lower_bounds: self.problem.lower_bounds().map(DVector),
upper_bounds: self.problem.upper_bounds().map(DVector),
};
if master_config
.lower_bounds
.as_ref()
.map(|lb| lb.len() != n)
.unwrap_or(false)
{
return Err(Error::Dimension("lower bounds".to_string()));
}
if master_config
.upper_bounds
.as_ref()
.map(|ub| ub.len() != n)
.unwrap_or(false)
{
return Err(Error::Dimension("upper bounds".to_string()));
}
debug!("Start master process");
self.master = Some(MasterProcess::start(master_config, self.threadpool.clone()));
let master = self.master.as_mut().unwrap();
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
.map_err(Error::Evaluation)?;
}
let mut have_minorants = vec![false; m];
let mut center_values: Vec<Option<Real>> = vec![None; m];
let mut cnt_remaining_objs = m;
let mut cnt_remaining_mins = m;
for m in self.client_rx.as_ref().unwrap() {
match m {
Ok(EvalResult::ObjectiveValue { index: i, value }) => {
debug!("Receive objective from subproblem {}: {}", i, value);
if center_values[i].is_none() {
cnt_remaining_objs -= 1;
center_values[i] = Some(value);
}
}
Ok(EvalResult::Minorant {
index: i,
minorant,
primal,
}) => {
debug!("Receive minorant from subproblem {}", i);
master.add_minorant(i, minorant, primal)?;
if !have_minorants[i] {
have_minorants[i] = true;
cnt_remaining_mins -= 1;
}
}
Err(err) => return Err(Error::Evaluation(err)),
};
if cnt_remaining_mins == 0 && cnt_remaining_objs == 0 {
break;
}
}
self.data.cur_weight = Real::infinity(); // gets initialized when the master problem is complete
master.set_weight(1.0)?;
master.solve(self.data.cur_val)?;
debug!("Initialization complete");
self.start_time = Instant::now();
Ok(())
}
/// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
}
/// Solve the problem with a maximal iteration limit.
pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
// First initialize the internal data structures.
self.init()?;
if self.solve_iter(limit)? {
Ok(())
} else {
Err(Error::IterationLimit { limit })
}
}
/// Solve the problem but stop after at most `niter` iterations.
///
/// The function returns `Ok(true)` if the termination criterion
/// has been satisfied. Otherwise it returns `Ok(false)` or an
/// error code.
///
/// If this function is called again, the solution process is
/// continued from the previous point. Because of this one *must*
/// call `init()` before the first call to this function.
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
debug!("Start solving up to {} iterations", niter);
let master = self.master.as_mut().ok_or(Error::NotInitialized)?;
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
let client_rx = self.client_rx.as_ref().ok_or(Error::NotInitialized)?;
let mut cnt_iter = 0;
let mut nxt_ubs = vec![Real::infinity(); self.problem.num_subproblems()];
let mut cnt_remaining_ubs = self.problem.num_subproblems();
let mut nxt_cutvals = vec![-Real::infinity(); self.problem.num_subproblems()];
let mut cnt_remaining_mins = self.problem.num_subproblems();
let mut nxt_d = Arc::new(dvec![]);
let mut nxt_y = Arc::new(dvec![]);
loop {
select! {
recv(client_rx) -> msg => {
let msg = msg
.map_err(|err| Error::Process(err.into()))?
.map_err(Error::Evaluation)?;
match msg {
EvalResult::ObjectiveValue { index, value } => {
debug!("Receive objective from subproblem {}: {}", index, value);
if nxt_ubs[index].is_infinite() {
cnt_remaining_ubs -= 1;
}
nxt_ubs[index] = nxt_ubs[index].min(value);
}
EvalResult::Minorant { index, mut minorant, primal } => {
debug!("Receive minorant from subproblem {}", index);
if nxt_cutvals[index].is_infinite() {
cnt_remaining_mins -= 1;
}
// move center of minorant to cur_y
minorant.move_center(-1.0, &nxt_d);
nxt_cutvals[index] = nxt_cutvals[index].max(minorant.constant);
// add minorant to master problem
master.add_minorant(index, minorant, primal)?;
}
}
if cnt_remaining_ubs == 0 && cnt_remaining_mins == 0 {
// All subproblems have been evaluated, do a step.
let nxt_ub = nxt_ubs.iter().sum::<Real>();
let descent_bnd = Self::get_descent_bound(self.params.acceptance_factor, &self.data);
self.data.nxt_val = nxt_ub;
self.data.new_cutval = nxt_cutvals.iter().sum::<Real>();
debug!("Step");
debug!(" cur_val ={}", self.data.cur_val);
debug!(" nxt_mod ={}", self.data.nxt_mod);
debug!(" nxt_ub ={}", nxt_ub);
debug!(" descent_bnd={}", descent_bnd);
let step;
if nxt_ub <= descent_bnd {
step = Step::Descent;
self.cnt_descent += 1;
self.data.cur_y = nxt_y.as_ref().clone();
self.data.cur_val = nxt_ub;
master.move_center(1.0, nxt_d.clone())?;
debug!("Descent Step");
debug!(" dir ={}", nxt_d);
debug!(" newy={}", self.data.cur_y);
self.data.cur_weight = self.weighter.descent_weight(&self.data);
} else {
step = Step::Null;
self.cnt_null += 1;
self.data.cur_weight = self.weighter.null_weight(&self.data);
}
master.set_weight(self.data.cur_weight)?;
Self::show_info(&self.start_time, step, &self.data, self.cnt_descent, self.cnt_null);
cnt_iter += 1;
if cnt_iter >= niter { break }
master.solve(self.data.cur_val)?;
}
},
recv(master.rx) -> msg => {
debug!("Receive master response");
// Receive result (new candidate) from the master
let master_res = msg
.map_err(|err| Error::Process(err.into()))?
.map_err(Error::Master)?;
if self.data.cur_weight < Real::infinity() && self.terminator.terminate(&self.data) {
info!("Termination criterion satisfied");
return Ok(true)
}
// Compress bundle
master.compress()?;
// Compute new candidate.
self.data.nxt_mod = master_res.nxt_mod;
self.data.sgnorm = master_res.sgnorm;
let mut next_y = dvec![];
nxt_d = Arc::new(master_res.nxt_d);
next_y.add(&self.data.cur_y, &nxt_d);
nxt_y = Arc::new(next_y);
// Reset evaluation data.
nxt_ubs.clear();
nxt_ubs.resize(self.problem.num_subproblems(), Real::infinity());
cnt_remaining_ubs = self.problem.num_subproblems();
nxt_cutvals.clear();
nxt_cutvals.resize(self.problem.num_subproblems(), -Real::infinity());
cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
for i in 0..self.problem.num_subproblems() {
self.problem.evaluate(i, nxt_y.clone(), i, client_tx.clone()) .map_err(Error::Evaluation)?;
}
// Compute the real initial weight.
if self.data.cur_weight.is_infinite() {
let weight = self.weighter.initial_weight(&self.data);
self.data.cur_weight = weight;
master.set_weight(weight)?;
}
},
}
}
Ok(false)
}
/// Return the bound the function value must be below of to enforce a descent step.
///
/// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
/// bundle method will perform a descent step.
///
/// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
/// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
/// progress and $\varrho$ is the `acceptance_factor`.
fn get_descent_bound(acceptance_factor: Real, data: &SolverData) -> Real {
data.cur_val - acceptance_factor * (data.cur_val - data.nxt_mod)
}
fn show_info(start_time: &Instant, step: Step, data: &SolverData, cnt_descent: usize, cnt_null: usize) {
let time = start_time.elapsed();
info!(
"{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4} {:4}{:1} {:9.4} {:9.4} \
{:12.6e}({:12.6e}) {:12.6e}",
if step == Step::Term { "_endit" } else { "endit " },
time.as_secs() / 3600,
(time.as_secs() / 60) % 60,
time.as_secs() % 60,
time.subsec_nanos() / 10_000_000,
cnt_descent,
cnt_descent + cnt_null,
0, /*self.master.cnt_updates(),*/
if step == Step::Descent { "*" } else { " " },
data.cur_weight,
data.expected_progress(),
data.nxt_mod,
data.nxt_val,
data.cur_val
);
}
}