/*
* Copyright (c) 2019, 2020 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};
use float_pretty_print::PrettyPrintFloat;
use log::{debug, info, warn};
use num_cpus;
use num_traits::{Float, ToPrimitive, Zero};
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{
ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse, Response};
use crate::data::Minorant;
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
pub mod guessmodels;
use guessmodels::{Guess, GuessModel, NearestValue};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;
/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;
/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
#[non_exhaustive]
pub enum Error<MErr, PErr> {
/// An error raised when creating a new master problem solver.
BuildMaster(MErr),
/// An error raised by the master problem process.
Master(MErr),
/// The iteration limit has been reached.
IterationLimit { limit: usize },
/// An error raised by a subproblem evaluation.
Evaluation(PErr),
/// An error raised subproblem update.
Update(PErr),
/// The dimension of some data is wrong.
Dimension(String),
/// Invalid bounds for a variable.
InvalidBounds { lower: Real, upper: Real },
/// The value of a variable is outside its bounds.
ViolatedBounds { lower: Real, upper: Real, value: Real },
/// The variable index is out of bounds.
InvalidVariable { index: usize, nvars: usize },
/// Disconnected channel.
Disconnected,
/// An error occurred in a subprocess.
Process(RecvError),
/// A method requiring an initialized solver has been called.
NotInitialized,
/// The problem has not been solved yet.
NotSolved,
/// Missing function value during initialization for a certain subproblem.
MissingObjective(usize),
/// Missing minorant during initialization for a certain subproblem.
MissingMinorant(usize),
}
/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
T,
Error<<<M as MasterBuilder>::MasterProblem as MasterProblem>::Err, <P as FirstOrderProblem>::Err>,
>;
impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
MErr: std::fmt::Display,
PErr: std::fmt::Display,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
use Error::*;
match self {
BuildMaster(err) => writeln!(fmt, "Cannot create master problem solver: {}", err),
Master(err) => writeln!(fmt, "Error in master problem: {}", err),
IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
Update(err) => writeln!(fmt, "Error in subproblem update: {}", err),
Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
InvalidBounds { lower, upper } => write!(fmt, "Invalid bounds, lower:{}, upper:{}", lower, upper),
ViolatedBounds { lower, upper, value } => write!(
fmt,
"Violated bounds, lower:{}, upper:{}, value:{}",
lower, upper, value
),
InvalidVariable { index, nvars } => {
write!(fmt, "Variable index out of bounds, got:{} must be < {}", index, nvars)
}
Disconnected => writeln!(fmt, "A channel got disconnected"),
Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
NotSolved => writeln!(fmt, "The problem has not been solved yet"),
MissingObjective(index) => writeln!(
fmt,
"Missing objective value during initialization for subproblem: {}",
index
),
MissingMinorant(index) => writeln!(
fmt,
"Missing minorant value during initialization for subproblem: {}",
index
),
}
}
}
impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
MErr: std::error::Error + 'static,
PErr: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
BuildMaster(err) => Some(err),
Master(err) => Some(err),
Evaluation(err) => Some(err),
Process(err) => Some(err),
_ => None,
}
}
}
impl<MErr, PErr> From<masterprocess::Error<MErr, PErr>> for Error<MErr, PErr>
where
MErr: std::error::Error + 'static,
{
fn from(err: masterprocess::Error<MErr, PErr>) -> Error<MErr, PErr> {
use masterprocess::Error::*;
match err {
DisconnectedSender => Error::Disconnected,
DisconnectedReceiver => Error::Disconnected,
Aggregation(err) => Error::Master(err),
SubgradientExtension(err) => Error::Update(err),
Master(err) => Error::Master(err),
}
}
}
impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
fn from(err: RecvError) -> Error<MErr, PErr> {
Error::Process(err)
}
}
/// Identifier for a subproblem evaluation.
#[derive(Clone, Copy, Debug)]
struct EvalId {
/// The index of the subproblem.
subproblem: usize,
/// The index of the candidate at which the subproblem is evaluated.
candidate_index: usize,
}
/// Parameters for tuning the solver.
#[derive(Debug, Clone)]
pub struct Parameters {
/// The descent step acceptance factors, must be in (0,1).
///
/// The default value is 0.1.
acceptance_factor: Real,
/// The factor for asynchronous imprecision, must be in (0,1).
///
/// The default value is 0.9.
imprecision_factor: Real,
}
impl Default for Parameters {
fn default() -> Self {
Parameters {
acceptance_factor: 0.1,
imprecision_factor: 0.9,
}
}
}
impl Parameters {
/// Change the descent step acceptance factor.
///
/// The default value is 0.1.
pub fn set_acceptance_factor(&mut self, acceptance_factor: Real) {
assert!(
acceptance_factor > 0.0 && acceptance_factor < 1.0,
"Descent step acceptance factors must be in (0,1), got: {}",
acceptance_factor
);
self.acceptance_factor = acceptance_factor;
}
/// Change the imprecision acceptance factor.
///
/// The default value is 0.9.
pub fn set_imprecision_factor(&mut self, imprecision_factor: Real) {
assert!(
imprecision_factor > 0.0 && imprecision_factor < 1.0,
"Imprecision factor must be in (0,1), got: {}",
imprecision_factor
);
self.imprecision_factor = imprecision_factor;
}
}
/// The step type that has been performed.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Step {
/// A null step has been performed.
Null,
/// A descent step has been performed.
Descent,
/// No step but the algorithm has been terminated.
Term,
}
pub struct SolverData {
/// Current center of stability.
cur_y: Arc<DVector>,
/// Function value in the current point.
cur_val: Real,
/// Function value at the current candidate.
nxt_val: Real,
/// Model value at the current candidate.
nxt_mod: Real,
/// Submodel values at the current candidate.
nxt_submods: Vec<Real>,
/// The current expected progress.
///
/// This value is actually `cur_val - nxt_val`. We store it separately only
/// for debugging purposes because after a descent step `cur_val` will be
/// changed and we could not see the "old" expected progress anymore that
/// led to the descent step.
expected_progress: Real,
/// Norm of current aggregated subgradient.
sgnorm: Real,
/// The error bound for the last descent step.
error_bound: Real,
/// The currently used master problem weight.
cur_weight: Real,
/// Maximal number of iterations.
max_iter: usize,
/// Number of inner model updates.
cnt_updates: usize,
/// Number of descent steps.
cnt_descent: usize,
/// Index of the current center.
center_index: usize,
/// Index of the current candidate.
candidate_index: usize,
/// The number of subproblems with insufficient evaluation data.
num_insufficient_candidates: usize,
/// The number of subproblems that have not been evaluated exactly in the center.
num_inexact_center: usize,
/// Whether the next step should be a forced descent step.
force_descent: bool,
/// Subproblem data.
subs: Vec<SubData>,
/// Step direction (i.e. nxt_y - cur_y).
nxt_d: Arc<DVector>,
/// Current candidate.
nxt_y: Arc<DVector>,
/// The list of all evaluation points.
candidates: Vec<Point>,
/// Whether we need a new update
need_update: bool,
/// Whether a problem update is currently in progress.
update_in_progress: bool,
}
impl SolverData {
/// Reset solver data to initial values.
///
/// This means that almost everything is set to +infinity so that
/// a null-step is forced after the first evaluation.
fn init(&mut self, y: DVector) {
self.cnt_descent = 0;
self.cur_y = Arc::new(y);
self.cur_val = Real::infinity();
self.nxt_val = Real::infinity();
self.nxt_mod = -Real::infinity();
self.nxt_submods = vec![-Real::infinity(); self.nxt_submods.len()];
self.expected_progress = Real::infinity();
self.error_bound = Real::infinity();
self.candidates.clear();
self.candidate_index = 0;
self.sgnorm = Real::infinity();
self.cur_weight = 1.0;
self.num_insufficient_candidates = 0;
self.num_inexact_center = self.nxt_submods.len();
self.force_descent = false;
}
}
impl Default for SolverData {
fn default() -> SolverData {
SolverData {
cur_y: Arc::new(dvec![]),
cur_val: 0.0,
nxt_val: 0.0,
nxt_mod: 0.0,
nxt_submods: vec![],
expected_progress: 0.0,
sgnorm: 0.0,
error_bound: Real::infinity(),
cur_weight: 1.0,
center_index: 0,
candidate_index: 0,
num_insufficient_candidates: 0,
num_inexact_center: 0,
force_descent: false,
candidates: vec![],
max_iter: 0,
cnt_descent: 0,
cnt_updates: 0,
subs: vec![],
nxt_d: Arc::new(dvec![]),
nxt_y: Arc::new(dvec![]),
need_update: true,
update_in_progress: false,
}
}
}
impl StandardTerminatable for SolverData {
fn center_value(&self) -> Real {
self.cur_val
}
fn expected_progress(&self) -> Real {
self.expected_progress
}
}
impl HKWeightable for SolverData {
fn current_weight(&self) -> Real {
self.cur_weight
}
fn center(&self) -> &DVector {
&self.cur_y
}
fn center_value(&self) -> Real {
self.cur_val
}
fn candidate_value(&self) -> Real {
self.nxt_val
}
fn candidate_model(&self) -> Real {
self.nxt_mod
}
fn new_cutvalue(&self) -> Real {
self.cur_val
}
fn sgnorm(&self) -> Real {
self.sgnorm
}
}
/// Data providing access for updating the problem.
struct UpdateData<Pr> {
/// Type of step.
step: Step,
/// Current center of stability.
cur_y: Arc<DVector>,
/// Current candidate.
nxt_y: Arc<DVector>,
/// The master process.
primal_aggrs: Vec<Pr>,
}
impl<Pr> UpdateState<Pr> for UpdateData<Pr>
where
Pr: Send + 'static,
{
fn was_descent(&self) -> bool {
self.step == Step::Descent
}
fn center(&self) -> &DVector {
&self.cur_y
}
fn candidate(&self) -> &DVector {
&self.nxt_y
}
fn aggregated_primal(&self, i: usize) -> &Pr {
&self.primal_aggrs[i]
}
}
/// An evaluation point.
#[derive(Clone)]
pub struct Point {
/// The globally unique index of the evaluation point.
index: usize,
/// The evaluation point itself.
point: Arc<DVector>,
}
impl Point {
fn distance(&self, p: &Point) -> Real {
if self.index != p.index {
let mut d = self.point.as_ref().clone();
d.add_scaled(-1.0, &p.point);
d.norm2()
} else {
Real::zero()
}
}
}
/// Model data of a single subproblem.
///
/// This struct does not handle the subproblem model itself. However, it handles
/// the asynchronous precision data, i.e. the guessed Lipschitz-constant and the
/// distance of the evaluation points to the candidate.
///
/// The concrete model used for computing the guessed values in the candidate
/// and the center must be provided by an implementation of `SubProblem`.
struct SubData {
/// The index associated with this subproblem.
fidx: usize,
/// The subproblem.
sub: Box<dyn GuessModel>,
/// The current candidate index.
candidate_index: usize,
/// The last index at which the evaluation has been started.
last_eval_index: usize,
/// Whether a subproblem evaluation is currently running.
is_running: bool,
/// Whether the last evaluation has been sufficiently close.
is_close_enough: bool,
/// The guess value and its evaluation distance in the current center.
center_guess: Guess,
/// The current guess of the Lipschitz constant.
l_guess: Real,
}
impl SubData {
fn new(fidx: usize, sub: Box<dyn GuessModel>) -> SubData {
SubData {
fidx,
sub,
last_eval_index: 0,
candidate_index: 0,
is_running: false,
is_close_enough: false,
center_guess: Guess::default(),
l_guess: 0.0,
}
}
/// Set the center of this model.
///
/// If `update_l_guess` is true also update the guess of the Lipschitz constant.
fn set_center(&mut self, y: &Point, update_l_guess: bool) {
assert_eq!(y.index, self.candidate_index, "Must move to current candidate");
// The guess value used in the current (i.e. old) center
let old_guess = self.center_guess;
// The cut value now known for the center.
let old_cutvalue = self.sub.center_lb();
// There has been a previous evaluation, so first update the Lipschitz guess ...
if update_l_guess && old_guess.dist > Real::zero() {
debug!(
"L-guess fidx:{} guess:{} cut:{}",
self.fidx, old_guess.value, old_cutvalue
);
let new_l_guess = if old_guess.dist.is_finite() {
(old_cutvalue - old_guess.value) / old_guess.dist
} else {
Real::zero()
};
if new_l_guess > self.l_guess {
debug!(
"New l_guess fidx:{} old-L:{} L:{}",
self.fidx, self.l_guess, new_l_guess
);
self.l_guess = new_l_guess;
}
}
// Save guess value of the candidate/new center
self.center_guess = self.sub.candidate_value().unwrap_or_else(Guess::default);
// Update submodel.
self.sub.set_center(y);
}
/// Set the candidate of this model.
fn set_candidate(&mut self, y: &Point, accept_factor: Real) {
self.sub.set_candidate(y);
self.candidate_index = y.index;
self.update_close_enough(accept_factor);
}
/// Add a function value to this model.
///
/// The `accept_factor` is a parameter for possibly accepting
/// the candidate guess value as "good enough" (if it has been
/// changed by the new minorant).
fn add_function_value(&mut self, y: &Point, value: Real, accept_factor: Real) {
self.sub.add_function_value(y, value);
self.update_close_enough(accept_factor)
}
/// Add a minorant to this model.
///
/// The `accept_factor` is a parameter for possibly accepting
/// the candidate guess value as "good enough" (if it has been
/// changed by the new minorant).
///
/// The minorant must be centered at the global 0.
fn add_minorant(&mut self, y: &Point, m: &Arc<Minorant>, accept_factor: Real) {
self.sub.add_minorant(y, m);
self.update_close_enough(accept_factor)
}
/// Return the current guess value at the current candidate.
///
/// The function might return `None` if no guess value is available.
fn candidate_value(&self) -> Option<Guess> {
self.sub.candidate_value()
}
/// Return the current lower bound value at the center.
fn center_lb(&self) -> Real {
self.sub.center_lb()
}
fn update_close_enough(&mut self, accept_factor: Real) {
self.is_close_enough = self
.sub
.candidate_value()
.map(|v| v.is_exact() || v.dist * self.l_guess <= accept_factor)
.unwrap_or(false);
}
/// Return the error estimation in the center.
///
/// This is the difference between the (current) lower bound and the used
/// guess value.
fn error_estimate(&self) -> Real {
self.sub.center_lb() - self.center_guess.value
}
fn center_guess_value(&self) -> Real {
self.center_guess.value
}
}
/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
P: FirstOrderProblem,
P::Err: 'static,
M: MasterBuilder,
{
/// Parameters for the solver.
pub params: Parameters,
/// Termination predicate.
pub terminator: T,
/// Weighter heuristic.
pub weighter: W,
/// The threadpool of the solver.
pub threadpool: ThreadPool,
/// The master problem builder.
pub master: M,
/// The first order problem.
problem: P,
/// The algorithm data.
data: SolverData,
/// The master problem process.
master_proc: Option<MasterProcess<P, M::MasterProblem>>,
/// Whether there is currently a master computation running.
master_running: bool,
/// Whether the master problem has been changed.
master_has_changed: bool,
/// The channel to receive the evaluation results from subproblems.
client_tx: Option<ClientSender<EvalId, P, M::MasterProblem>>,
/// The channel to receive the evaluation results from subproblems.
client_rx: Option<ClientReceiver<EvalId, P, M::MasterProblem>>,
/// Time when the solution process started.
///
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
P::Err: Send + 'static,
T: Terminator<SolverData> + Default,
W: Weighter<SolverData> + Default,
M: MasterBuilder,
<M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
where
M: Default,
{
Self::with_master(problem, M::default())
}
/// Create a new parallel bundle solver.
pub fn with_master(problem: P, master: M) -> Self {
let ncpus = num_cpus::get();
info!("Initializing asynchronous solver with {} CPUs", ncpus);
Solver {
params: Parameters::default(),
terminator: Default::default(),
weighter: Default::default(),
problem,
data: SolverData::default(),
threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), ncpus),
master,
master_proc: None,
master_has_changed: false,
master_running: false,
client_tx: None,
client_rx: None,
start_time: Instant::now(),
}
}
/// Return the underlying threadpool.
///
/// In order to use the same threadpool for concurrent processes,
/// just clone the returned `ThreadPool`.
pub fn threadpool(&self) -> &ThreadPool {
&self.threadpool
}
/// Set the threadpool.
///
/// This function allows to use a specific threadpool for all processes
/// spawned by the solver. Note that this does not involve any threads
/// used by the problem because the solver is not responsible for executing
/// the evaluation process of the subproblems. However, the problem might
/// use the same threadpool as the solver.
pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
self.threadpool = threadpool;
}
/// Return the current problem associated with the solver.
pub fn problem(&self) -> &P {
&self.problem
}
/// Initialize the solver.
///
/// This will reset the internal data structures so that a new fresh
/// solution process can be started.
///
/// It will also setup all worker processes.
///
/// This function is automatically called by [`Solver::solve`].
pub fn init(&mut self) -> Result<(), P, M> {
debug!("Initialize solver");
let n = self.problem.num_variables();
let m = self.problem.num_subproblems();
self.data.init(dvec![0.0; n]);
let (tx, rx) = channel();
self.client_tx = Some(tx.clone());
self.client_rx = Some(rx);
let master_config = MasterConfig {
num_subproblems: m,
num_vars: n,
lower_bounds: self.problem.lower_bounds().map(DVector),
upper_bounds: self.problem.upper_bounds().map(DVector),
};
if master_config
.lower_bounds
.as_ref()
.map(|lb| lb.len() != n)
.unwrap_or(false)
{
return Err(Error::Dimension("lower bounds".to_string()));
}
if master_config
.upper_bounds
.as_ref()
.map(|ub| ub.len() != n)
.unwrap_or(false)
{
return Err(Error::Dimension("upper bounds".to_string()));
}
debug!("Start master process");
self.master_proc = Some(MasterProcess::start(
self.master.build().map_err(Error::BuildMaster)?,
master_config,
tx,
&mut self.threadpool,
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
self.data.subs = (0..m)
.map(|fidx| SubData::new(fidx, Box::new(NearestValue::new())))
.collect();
self.data.nxt_y = self.data.cur_y.clone();
// The initial evaluation point.
let point = Point {
index: self.data.candidate_index,
point: self.data.cur_y.clone(),
};
// This could be done better: the initial evaluation has index 1!
self.data.candidates.push(point.clone());
self.data.candidates.push(point.clone());
self.data.candidate_index = 1;
for i in 0..m {
self.data.subs[i].set_candidate(&point, 0.0);
self.data.subs[i].set_center(&point, false);
self.evaluate_subproblem(i)?;
}
self.start_time = Instant::now();
// wait for all subproblem evaluations.
let mut cnt_remaining = self.problem.num_subproblems();
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
let client_rx = self.client_rx.as_ref().ok_or(Error::NotInitialized)?;
while cnt_remaining > 0 {
let msg = client_rx.recv();
match msg? {
Message::Eval(m) => match m {
EvalResult::ObjectiveValue { index, value } => {
assert_eq!(
index.candidate_index, self.data.candidate_index,
"Receive objective value for unexpected candidate"
);
self.data.subs[index.subproblem].add_function_value(&point, value, 0.0);
}
EvalResult::Minorant {
index,
mut minorant,
primal,
} => {
assert_eq!(
index.candidate_index, self.data.candidate_index,
"Receive objective value for unexpected candidate"
);
// Add the minorant to the master problem.
// The minorant is centered at the candidate == center, so it does
// not need to be moved.
master.add_minorant(index.subproblem, minorant.clone(), primal)?;
self.master_has_changed = true;
// Center the minorant at 0.
minorant.move_center(-1.0, &point.point);
self.data.subs[index.subproblem].add_minorant(&point, &Arc::new(minorant), 0.0);
}
EvalResult::Done { index } => {
self.data.subs[index.subproblem].is_running = false;
cnt_remaining -= 1;
}
EvalResult::Error { err, index } => {
self.data.subs[index.subproblem].is_running = false;
return Err(Error::Evaluation(err));
}
},
Message::Update(_) => {
unreachable!("Receive update response during initialization");
}
Message::Master(_) => {
unreachable!("Receive master response during initialization");
}
}
}
// Set the initial values.
// For the center this is the current lower bound (cut value),
// for the candidate it is the model candidate value.
//
// Note that both are the same ...
self.data.cur_val = 0.0;
self.data.nxt_val = 0.0;
for s in &self.data.subs {
self.data.cur_val += s.center_lb();
self.data.nxt_val += s.candidate_value().unwrap().value;
}
assert!((self.data.cur_val - self.data.nxt_val).abs() < 1e-6);
// This is the first evaluation. We effectively get
// the function value at the current center but
// we do not have a model estimate yet. Hence, we do not know
// a good guess for the weight.
self.data.cur_weight = Real::infinity();
self.data.need_update = true;
self.update_problem(Step::Descent)?;
debug!("First Step");
debug!(" cur_val={}", self.data.cur_val);
self.data.cnt_descent += 1;
// compute the initial candidate
self.update_candidate()?;
self.show_info(Step::Descent);
debug!("Initialization complete");
Ok(())
}
/// Reset data for new iterations.
fn reset_iteration_data(&mut self, max_iter: usize) {
let num_variables = self.problem.num_variables();
self.data.max_iter = max_iter;
self.data.cnt_updates = 0;
self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
self.data.nxt_y = Arc::new(dvec![]);
self.data.nxt_val = self
.data
.subs
.iter()
.map(|s| s.candidate_value().unwrap().value)
.sum::<Real>();
self.data.need_update = true;
self.data.update_in_progress = false;
}
/// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
pub fn solve(&mut self) -> Result<(), P, M> {
self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
}
/// Solve the problem with a maximal iteration limit.
pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), P, M> {
// First initialize the internal data structures.
self.init()?;
if self.solve_iter(limit)? {
Ok(())
} else {
Err(Error::IterationLimit { limit })
}
}
/// Solve the problem but stop after at most `niter` iterations.
///
/// The function returns `Ok(true)` if the termination criterion
/// has been satisfied. Otherwise it returns `Ok(false)` or an
/// error code.
///
/// If this function is called again, the solution process is
/// continued from the previous point. Because of this one *must*
/// call `init()` before the first call to this function.
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
debug!("Start solving up to {} iterations", niter);
self.reset_iteration_data(niter);
loop {
let msg = self.client_rx.as_ref().ok_or(Error::NotInitialized)?.recv()?;
match msg {
Message::Eval(m) => {
// Receive a evaluation result
if self.handle_client_response(m)? {
return Ok(true);
}
}
Message::Update(msg) => {
debug!("Receive update response");
if self.handle_update_response(msg)? {
// The master problem has been changed so we need a new
// candidate as well.
self.update_candidate()?;
}
}
Message::Master(mresponse) => {
debug!("Receive master response");
// Receive result (new candidate) from the master
if self.handle_master_response(mresponse)? {
return Ok(true);
}
}
}
}
}
/// Handle a response `msg` from a subproblem evaluation.
///
/// The result from a subproblem evaluation is either a (upper bound on the)
/// function value is some point or a subgradient (or linear minorant) on
/// the function in some point.
///
/// The current implementation will wait until all subproblems have created
/// at least one function value and one subgradient. Once this is true,
/// either a descent step or a null step is performed depending on the
/// outcome of the descent test.
///
/// After the step the problem gets a chance to update (e.g. generate new
/// variables), and the master problem is started to compute a new
/// candidate.
///
/// The function returns
/// - `Ok(true)` if the final iteration count has been reached,
/// - `Ok(false)` if the final iteration count has not been reached,
/// - `Err(_)` on error.
fn handle_client_response(&mut self, msg: EvalResult<EvalId, P::Primal, P::Err>) -> Result<bool, P, M> {
match msg {
EvalResult::ObjectiveValue { index, value } => self.handle_new_objective(index, value),
EvalResult::Minorant {
index,
minorant,
primal,
} => self.handle_new_minorant(index, minorant, primal),
EvalResult::Done { index } => {
let sub = &mut self.data.subs[index.subproblem];
sub.is_running = false;
// possibly restart the subproblem for the current candidate
self.evaluate_subproblem(index.subproblem)?;
Ok(false)
}
EvalResult::Error { err, index } => {
self.data.subs[index.subproblem].is_running = false;
Err(Error::Evaluation(err))
}
}
}
/// A new objective value has been computed.
fn handle_new_objective(&mut self, id: EvalId, value: Real) -> Result<bool, P, M> {
debug!(
"Receive objective from subproblem:{} candidate:{} current:{} obj:{}",
id.subproblem, id.candidate_index, self.data.candidate_index, value
);
// check if new evaluation is closer to the current candidate
let sub = &mut self.data.subs[id.subproblem];
// Whether the subproblem evaluation had been good enough.
let was_close_enough = sub.is_close_enough;
if let Some(nxt) = self.data.candidates.get_mut(id.candidate_index) {
let accept_factor =
self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
/ self.problem.num_subproblems().to_f64().unwrap();
let old_can_val = sub.candidate_value().unwrap().value;
let old_cen_val = sub.center_lb();
sub.add_function_value(nxt, value, accept_factor);
let new_can_val = sub.candidate_value().unwrap().value;
let new_cen_val = sub.center_lb();
self.data.nxt_val += new_can_val - old_can_val;
self.data.cur_val += new_cen_val - old_cen_val;
// This is just a safe-guard: if the function has been evaluated at
// the current candidate, the evaluation *must* be good enough.
assert!(
sub.is_close_enough || sub.last_eval_index < self.data.candidate_index,
"Unexpected insufficiency fidx:{} l:{}",
id.subproblem,
sub.l_guess,
);
} else {
// unknown candidate -> ignore objective value
warn!("Ignore unknown candidate index:{}", id.candidate_index);
return Ok(false);
}
// Test if the new candidate is close enough for the asynchronous
// precision test.
if !was_close_enough && sub.is_close_enough {
debug!(
"Accept result fidx:{} index:{} candidate:{} (remaining insufficient: {})",
id.subproblem, id.candidate_index, self.data.candidate_index, self.data.num_insufficient_candidates
);
}
self.maybe_do_step(false)
}
/// Add a new minorant.
///
/// The minorant is added to the submodel as well as the master problem. The modified submodel
/// may then lead to a potential null/descent-step.
///
/// Return values
/// - `Ok(true)` if the termination criterion has been satisfied,
/// - `Ok(false)` if the termination criterion has not been satisfied,
/// - `Err(_)` on error.
fn handle_new_minorant(&mut self, id: EvalId, minorant: Minorant, primal: P::Primal) -> Result<bool, P, M> {
debug!(
"Receive minorant subproblem:{} candidate:{} current:{} center:{}",
id.subproblem, id.candidate_index, self.data.candidate_index, self.data.center_index,
);
let accept_factor =
self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
/ self.problem.num_subproblems().to_f64().unwrap();
let sub = &mut self.data.subs[id.subproblem];
let mut minorant = minorant;
if let Some(point) = self.data.candidates.get_mut(id.candidate_index) {
// center the minorant at 0
minorant.move_center(-1.0, &point.point);
// add minorant to submodel
let old_can_val = sub.candidate_value().unwrap().value;
let old_cen_val = sub.center_lb();
sub.add_minorant(&point, &Arc::new(minorant.clone()), accept_factor);
let new_can_val = sub.candidate_value().unwrap().value;
let new_cen_val = sub.center_lb();
self.data.nxt_val += new_can_val - old_can_val;
self.data.cur_val += new_cen_val - old_cen_val;
// center the minorant at the current center
minorant.move_center(1.0, &self.data.cur_y);
} else {
warn!("Ignore unknown candidate index:{}", id.candidate_index);
return Ok(false);
}
// add minorant to master problem
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
master.add_minorant(id.subproblem, minorant, primal)?;
self.master_has_changed = true;
self.maybe_do_step(false)
}
/// Handle a response `master_res` from the master problem process.
///
/// The master response is the new candidate point. The method updates the
/// algorithm state with the data of the new candidate (e.g. the model value
/// `nxt_mod` in the point or the expected progress). Then it tests whether
/// a termination criterion is satisfied. This is only the case if there is
/// no pending problem update.
///
/// Finally the master problem starts the evaluation of all subproblems at
/// the new candidate.
///
/// The new candidate is immediately checked for a potential new test.
///
/// Return values
/// - `Ok(true)` if the termination criterion has been satisfied,
/// - `Ok(false)` if the termination criterion has not been satisfied,
/// - `Err(_)` on error.
fn handle_master_response(&mut self, master_res: MasterResponse<P, M::MasterProblem>) -> Result<bool, P, M> {
match master_res {
Response::Error(err) => return Err(err.into()),
Response::Result {
nxt_mod,
sgnorm,
cnt_updates,
nxt_d,
nxt_submods,
center_index,
} => {
self.data.nxt_d = Arc::new(nxt_d);
self.data.nxt_mod = nxt_mod;
self.data.sgnorm = sgnorm;
self.data.cnt_updates = cnt_updates;
self.data.nxt_submods.clear();
self.data.nxt_submods.extend(nxt_submods);
debug!(
"Master Response current_center:{} current_candidate:{} res_center:{} nxt_mod:{}",
self.data.center_index, self.data.candidate_index, center_index, self.data.nxt_mod
);
}
};
self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;
self.master_running = false;
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
// If this is the very first solution of the model,
// we use its result as to make a good guess for the initial weight
// of the proximal term and resolve.
if self.data.cur_weight.is_infinite() {
self.data.cur_weight = self.weighter.initial_weight(&self.data);
master.set_weight(self.data.cur_weight)?;
self.master_has_changed = true;
self.update_candidate()?;
return Ok(false);
}
// Compress bundle
master.compress()?;
// Compute new candidate.
let mut next_y = dvec![];
self.data.candidate_index += 1;
// Check if new variables had been added. In this case, resize cur_y.
if self.data.nxt_d.len() > self.data.cur_y.len() {
let nnew = self.data.nxt_d.len() - self.data.cur_y.len();
if nnew != self.data.cur_y.len() {
let mut cur_y = self.data.cur_y.as_ref().clone();
cur_y.extend(repeat(0.0).take(nnew));
self.data.cur_y = Arc::new(cur_y);
}
}
next_y.add(&self.data.cur_y, &self.data.nxt_d);
#[cfg(debug_assertions)]
{
if self.data.nxt_y.len() == next_y.len() {
let mut diff = self.data.nxt_y.as_ref().clone();
diff.add_scaled(-1.0, &next_y);
debug!("Candidates move distance:{}", diff.norm2());
}
}
self.data.nxt_y = Arc::new(next_y);
// Add the new candidate to the list of candidates.
let candidate_index = self.data.candidates.len();
let can = Point {
index: candidate_index,
point: self.data.nxt_y.clone(),
};
self.data.candidates.push(can.clone());
// Update the candidate in all submodels.
let accept_factor =
self.params.imprecision_factor * self.params.acceptance_factor * self.data.expected_progress
/ self.problem.num_subproblems().to_f64().unwrap();
// Reset evaluation data.
let old_nxt_val = self.data.nxt_val;
self.data.nxt_val = 0.0;
for sub in self.data.subs.iter_mut() {
sub.set_candidate(&can, accept_factor);
self.data.nxt_val += sub.candidate_value().unwrap().value;
}
debug!("RESET old:{} new:{}", old_nxt_val, self.data.nxt_val);
// Start evaluation of all subproblems at the new candidate.
for i in 0..self.data.subs.len() {
self.evaluate_subproblem(i)?;
}
self.maybe_do_step(true)
}
/// Do a descent or null step if precision is sufficient.
///
/// Also checks if the termination criterion is satisfied.
///
/// Return values
/// - `Ok(true)` if the termination criterion has been satisfied,
/// - `Ok(false)` if the termination criterion has not been satisfied,
/// - `Err(_)` on error.
fn maybe_do_step(&mut self, check_termination: bool) -> Result<bool, P, M> {
// No step if there is no real new candidate
if self.data.candidate_index == self.data.center_index {
return Ok(false);
}
self.data.num_insufficient_candidates = self.data.subs.iter().filter(|s| !s.is_close_enough).count();
let num_exact = self
.data
.subs
.iter()
.filter(|s| s.candidate_value().unwrap().is_exact())
.count();
let sum_dist = self
.data
.subs
.iter()
.map(|s| s.candidate_value().unwrap().dist)
.sum::<Real>();
debug!(
"Number of insufficient subproblems: {} num exact: {} sum dist: {}",
self.data.num_insufficient_candidates, num_exact, sum_dist,
);
// test if we should terminate
if check_termination
&& self.terminator.terminate(&self.data)
&& !self.data.update_in_progress
&& self.data.cnt_descent > 2
&& !self.data.need_update
{
if self.data.num_inexact_center.is_zero() {
self.show_info(Step::Term);
info!("Termination criterion satisfied");
return Ok(true);
}
// The termination criterion has been satisfied, but the current
// center evaluations are not exact. We force the current
// *candidate* evaluations to be exact and do a forced descent step.
// This causes the next center to be exact and hopefully satisfying
// the termination criterion, too.
let num_inexact = self.data.subs.len() - num_exact;
debug!(
"Termination criterion satisfied with {} inexact evaluations",
num_inexact
);
// Current candidate is exact, force a descent step.
self.data.force_descent = true;
if num_inexact.is_zero() {
return self.do_step();
}
// Otherwise just wait for the exact evaluations.
return Ok(false);
}
if check_termination {
self.data.force_descent = false;
}
if self.data.num_insufficient_candidates == 0 {
self.do_step()
} else {
Ok(false)
}
}
/// Do a null-step or descent-step based on the current candidate data.
fn do_step(&mut self) -> Result<bool, P, M> {
let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
let descent_bnd = Self::get_descent_bound(self.params.acceptance_factor, &self.data);
debug!(
"Try step from center:{} to candidate:{}",
self.data.center_index, self.data.candidate_index
);
// Test whether we do a descent step
if self.data.nxt_val <= descent_bnd || self.data.force_descent {
debug!("Descent Step{}", if self.data.force_descent { " (forced)" } else { "" });
debug!(" cur_val ={}", self.data.cur_val);
debug!(" nxt_mod ={}", self.data.nxt_mod);
debug!(" nxt_ub ={}", self.data.nxt_val);
debug!(" descent_bnd={}", descent_bnd);
self.data.force_descent = false;
self.data.cnt_descent += 1;
self.data.center_index = self.data.candidate_index;
self.data.num_inexact_center = self
.data
.subs
.iter()
.filter(|s| !s.candidate_value().unwrap().is_exact())
.count();
// Note that we must update the weight *before* we
// change the internal data, so the old information
// that caused the descent step is still available.
self.data.cur_weight = self.weighter.descent_weight(&self.data);
self.data.cur_y = self.data.nxt_y.clone();
// The new value in the center is the model value in the candidate.
// In particular, it is a lower bound on the real function value.
//
// Note that we do not use the model value `nxt_mod`, but the
// sum of the single model values, because the latter might be higher
// in case of an aggregated model.
self.data.cur_val = self.data.nxt_submods.iter().sum();
// Check if the progress of the last decent step was large enough
// when using the lower bound in the center instead of the former
// guess value.
let error = self.data.subs.iter().map(SubData::error_estimate).sum::<Real>();
let update_l_guess = error > self.data.error_bound;
// save new error bound
self.data.error_bound = self.data.expected_progress * self.params.acceptance_factor;
// Move all subproblems.
let point = Point {
index: self.data.center_index,
point: self.data.cur_y.clone(),
};
self.data.nxt_val = Real::zero();
for sub in &mut self.data.subs {
sub.set_center(&point, update_l_guess);
self.data.nxt_val += sub.candidate_value().unwrap().value;
}
self.data.need_update = true;
master.move_center(1.0, self.data.nxt_d.clone(), self.data.center_index)?;
master.set_weight(self.data.cur_weight)?;
self.master_has_changed = true;
self.show_info(Step::Descent);
self.update_problem(Step::Descent)?;
// We need a new candidate.
self.update_candidate()?;
Ok(self.data.cnt_descent >= self.data.max_iter)
} else {
debug!("Null Step nxt_val:{} descent_bnd:{}", self.data.nxt_val, descent_bnd);
// No descent-step, so this is declared a null step
self.data.cur_weight = self.weighter.null_weight(&self.data);
self.show_info(Step::Null);
self.update_problem(Step::Null)?;
// After a null step we need a new candidate, too.
self.update_candidate()?;
Ok(false)
}
}
/// Start evaluation of a subproblem if it is not running.
///
/// The evaluation is started at the current candidate. The candidate
/// is added to the subproblem's candidate list.
///
/// Returns `true` iff a new evaluations has been started.
fn evaluate_subproblem(&mut self, subproblem: usize) -> Result<bool, P, M> {
let sub = &mut self.data.subs[subproblem];
if !sub.is_running && sub.last_eval_index < self.data.candidate_index {
sub.is_running = true;
sub.last_eval_index = sub.last_eval_index.max(self.data.candidate_index);
self.problem
.evaluate(
subproblem,
self.data.nxt_y.clone(),
ChannelResultSender::new(
EvalId {
subproblem,
candidate_index: self.data.candidate_index,
},
self.client_tx.as_ref().ok_or(Error::NotInitialized)?.clone(),
),
)
.map_err(Error::Evaluation)?;
debug!("Evaluate fidx:{} candidate:{}", subproblem, self.data.candidate_index);
Ok(true)
} else {
assert!(sub.is_running || sub.is_close_enough);
Ok(false)
}
}
/// Possibly start a new master process computation.
fn update_candidate(&mut self) -> Result<(), P, M> {
if !self.master_running && self.master_has_changed {
debug!("Start master problem");
self.master_running = true;
self.master_has_changed = false;
self.master_proc
.as_mut()
.ok_or(Error::NotInitialized)?
.solve(self.data.cur_val)?;
}
Ok(())
}
/// Update the problem after descent or null `step`.
///
/// After a successful descent or null step the problem gets a chance to
/// update, i.e. to change some problem properties like adding new
/// variables. This method starts the update process.
///
/// Return values
/// - `Ok(true)` if a new update process has been started,
/// - `Ok(false)` if there is already a running update process (only one
/// is allowed at the same time),
/// - `Err(_)` on error.
fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
// only one update may be running at the same time
if self.data.update_in_progress {
return Ok(false);
}
// Ok, we are doing a new update now ...
self.data.need_update = false;
// TODO: fix this
return Ok(false);
let master_proc = self.master_proc.as_mut().unwrap();
self.problem
.update(
UpdateData {
cur_y: self.data.cur_y.clone(),
nxt_y: self.data.nxt_y.clone(),
step,
primal_aggrs: (0..self.problem.num_subproblems())
.map(|i| {
master_proc
.get_aggregated_primal(i)
.map_err(|_| "get_aggregated_primal".to_string())
.expect("Cannot get aggregated primal from master process")
})
.collect(),
},
ChannelUpdateSender::new(
EvalId {
subproblem: 0,
candidate_index: self.data.center_index,
},
self.client_tx.clone().ok_or(Error::NotInitialized)?,
),
)
.map_err(Error::Update)?;
self.data.update_in_progress = true;
Ok(true)
}
/// Handles an update response `update` of the problem.
///
/// The method is called if the problem informs the solver about a change of
/// the problem, e.g. adding a new variable. This method informs the other
/// parts of the solver, e.g. the master problem, about the modification.
///
/// The method returns `Ok(true)` if the master problem has been modified.
fn handle_update_response(&mut self, update: Update<EvalId, P::Primal, P::Err>) -> Result<bool, P, M> {
let mut modified = false;
match update {
Update::AddVariables { bounds, sgext, .. } => {
if !bounds.is_empty() {
// add new variables
self.master_proc
.as_mut()
.unwrap()
.add_vars(bounds.into_iter().map(|(l, u)| (None, l, u)).collect(), sgext)?;
modified = true;
}
}
Update::Done { .. } => self.data.update_in_progress = false, // currently we do nothing ...
Update::Error { err, .. } => {
self.data.update_in_progress = false;
return Err(Error::Update(err));
}
}
Ok(modified)
}
/// Return the bound the function value must be below of to enforce a descent step.
///
/// If the oracle guarantees that $f(\bar{y}) \le$ this bound, the
/// bundle method will perform a descent step.
///
/// This value is $f(\hat{y}) + \varrho \cdot \Delta$ where
/// $\Delta = f(\hat{y}) - \hat{f}(\bar{y})$ is the expected
/// progress and $\varrho$ is the `acceptance_factor`.
fn get_descent_bound(acceptance_factor: Real, data: &SolverData) -> Real {
data.cur_val - acceptance_factor * (data.cur_val - data.nxt_mod)
}
/// Log some information about the latest `step`.
fn show_info(&self, step: Step) {
let time = self.start_time.elapsed();
info!(
"{} {:0>2}:{:0>2}:{:0>2}.{:0>2} {:4} {:4}{:1} {:9.4} {:9.4}({:9.4}) \
{:12.6}({:12.6}) {:12.6}({:12.6})",
if step == Step::Term { "_endit" } else { "endit " },
time.as_secs() / 3600,
(time.as_secs() / 60) % 60,
time.as_secs() % 60,
time.subsec_nanos() / 10_000_000,
self.data.cnt_descent,
self.data.cnt_updates,
if step == Step::Descent { "*" } else { " " },
PrettyPrintFloat(self.data.cur_weight),
PrettyPrintFloat(self.data.expected_progress()),
PrettyPrintFloat(self.data.cur_val - self.data.nxt_val),
PrettyPrintFloat(self.data.nxt_mod),
PrettyPrintFloat(self.data.nxt_val),
PrettyPrintFloat(self.data.cur_val),
PrettyPrintFloat(self.data.subs.iter().map(|s| s.center_guess_value()).sum::<Real>()),
);
}
/// Return the aggregated primal of the given subproblem.
pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
Ok(self
.master_proc
.as_ref()
.ok_or(Error::NotSolved)?
.get_aggregated_primal(subproblem)?)
}
}