/*
* Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
use crossbeam::channel::{unbounded as channel, Receiver, Sender};
use num_cpus;
use std::sync::Arc;
use threadpool::ThreadPool;
use crate::{DVector, Minorant, Real};
use super::problem::{EvalResult, FirstOrderProblem};
use crate::master::{
BoxedMasterProblem, CplexMaster, Error as MasterProblemError, MasterProblem, UnconstrainedMasterProblem,
};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
/// An error raised by the master problem process.
Master(MasterProblemError<E>),
/// The iteration limit has been reached.
IterationLimit { limit: usize },
/// An error raised by a subproblem evaluation.
Evaluation(E),
}
impl<E> std::fmt::Display for Error<E>
where
E: std::fmt::Display,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
use Error::*;
match self {
Master(err) => writeln!(fmt, "Error in master problem: {}", err),
IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
}
}
}
impl<E> std::error::Error for Error<E>
where
E: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
Master(err) => Some(err),
Evaluation(err) => Some(err),
_ => None,
}
}
}
impl<E> From<MasterProblemError<E>> for Error<E>
where
E: std::error::Error + Send + Sync,
{
fn from(err: MasterProblemError<E>) -> Self {
Error::Master(err)
}
}
/// A task for the master problem.
enum MasterTask {
/// Add a new minorant for a subfunction to the master problem.
AddMinorant(usize, Minorant),
/// Move the center of the master problem in the given direction.
MoveCenter(Real, Arc<DVector>),
/// Start a new computation of the master problem.
Solve { center_value: Real },
}
type MasterSender<E> = Sender<std::result::Result<Real, MasterProblemError<E>>>;
type MasterReceiver = Receiver<MasterTask>;
/// Implementation of a parallel bundle method.
pub struct Solver<P>
where
P: FirstOrderProblem,
{
/// The first order problem.
problem: P,
/// Current center of stability.
cur_y: DVector,
/// Function value in the current point.
cur_val: Real,
/// The threadpool of the solver.
threadpool: ThreadPool,
/// The channel to transmit new tasks to the master problem.
master_tx: Option<Sender<MasterTask>>,
/// The channel to receive solutions from the master problem.
master_rx: Option<Receiver<std::result::Result<Real, MasterProblemError<P::Err>>>>,
/// The channel to receive the evaluation results from subproblems.
client_tx: Option<Sender<std::result::Result<EvalResult<usize, P::Primal>, P::Err>>>,
/// The channel to receive the evaluation results from subproblems.
client_rx: Option<Receiver<std::result::Result<EvalResult<usize, P::Primal>, P::Err>>>,
/// Number of descent steps.
cnt_descent: usize,
/// Number of function evaluation.
cnt_evals: usize,
}
impl<P> Solver<P>
where
P: FirstOrderProblem,
P::Err: std::error::Error + Send + Sync + 'static,
{
pub fn new(problem: P) -> Result<Solver<P>, Error<P::Err>> {
Ok(Solver {
problem,
cur_y: dvec![],
cur_val: 0.0,
threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), num_cpus::get()),
master_tx: None,
master_rx: None,
client_tx: None,
client_rx: None,
cnt_descent: 0,
cnt_evals: 0,
})
}
/// Return the underlying threadpool.
///
/// In order to use the same threadpool for concurrent processes,
/// just clone the returned `ThreadPool`.
pub fn threadpool(&self) -> &ThreadPool {
&self.threadpool
}
/// Set the threadpool.
///
/// This function allows to use a specific threadpool for all processes
/// spawned by the solver. Note that this does not involve any threads
/// used by the problem because the solver is not responsible for executing
/// the evaluation process of the subproblems. However, the problem might
/// use the same threadpool as the solver.
pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
self.threadpool = threadpool;
}
/// Initialize the solver.
///
/// This will reset the internal data structures so that a new fresh
/// solution process can be started.
///
/// It will also setup all worker processes.
///
/// This function is automatically called by [`solve`].
pub fn init(&mut self) -> Result<(), Error<P::Err>> {
let n = self.problem.num_variables();
let m = self.problem.num_subproblems();
self.cur_y.init0(n);
self.cnt_descent = 0;
self.cnt_evals = 0;
let (tx, rx) = channel();
self.client_tx = Some(tx);
self.client_rx = Some(rx);
let (tx, rx) = channel();
let (rev_tx, rev_rx) = channel();
self.master_tx = Some(tx);
self.master_rx = Some(rev_rx);
self.threadpool.execute(move || {
let mut rev_tx = rev_tx;
if let Err(err) = Self::master_main(&mut rev_tx, rx) {
#[allow(unused_must_use)]
{
rev_tx.send(Err(err));
}
}
});
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
.map_err(Error::Evaluation)?;
}
let mut center_values: Vec<Option<Real>> = vec![None; m];
let mut minorants: Vec<Option<Minorant>> = vec![None; m];
let mut cnt_remaining_objs = m;
let mut cnt_remaining_mins = m;
for m in self.client_rx.as_ref().unwrap() {
match m {
Ok(EvalResult::ObjectiveValue { index: i, value }) => {
println!("Receive objective {}", i);
if center_values[i].is_none() {
cnt_remaining_objs -= 1;
center_values[i] = Some(value);
}
}
Ok(EvalResult::Minorant { index: i, minorant, .. }) => {
println!("Receive minorant {}", i);
if let Some(ref mut min) = minorants[i] {
if min.constant < minorant.constant {
*min = minorant;
}
} else {
cnt_remaining_mins -= 1;
minorants[i] = Some(minorant);
}
}
Err(err) => return Err(Error::Evaluation(err)),
};
if cnt_remaining_mins == 0 && cnt_remaining_objs == 0 {
break;
}
}
panic!("Initial evaluation completed");
Ok(())
}
fn master_main(
tx: &mut MasterSender<P::Err>,
rx: MasterReceiver,
) -> std::result::Result<(), MasterProblemError<P::Err>> {
let mut master = CplexMaster::new().map(BoxedMasterProblem::with_master)?;
for m in rx {
match m {
MasterTask::AddMinorant(i, m) => {
master.add_minorant(i, m)?;
}
MasterTask::MoveCenter(alpha, d) => {
master.move_center(alpha, &d);
}
MasterTask::Solve { center_value } => {
master.solve(center_value)?;
}
};
}
Ok(())
}
/// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
}
/// Solve the problem with a maximal iteration limit.
pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
// First initialize the internal data structures.
self.init()?;
if self.solve_iter(limit)? {
Ok(())
} else {
Err(Error::IterationLimit { limit })
}
}
/// Solve the problem but stop after at most `niter` iterations.
///
/// The function returns `Ok(true)` if the termination criterion
/// has been satisfied. Otherwise it returns `Ok(false)` or an
/// error code.
///
/// If this function is called again, the solution process is
/// continued from the previous point. Because of this one *must*
/// call `init()` before the first call to this function.
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
unimplemented!()
}
}