/*
* Copyright (c) 2019, 2020 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! Asynchronous process solving a master problem.
#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};
use either::Either;
use log::{debug, warn};
use std::sync::Arc;
use threadpool::ThreadPool;
use super::channels::{ClientSender, Message};
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{DVector, Minorant, Real};
#[derive(Debug)]
pub enum Error<MErr, PErr> {
/// The communication channel for sending requests has been disconnected.
DisconnectedSender,
/// The communication channel for sending responds has been disconnected.
DisconnectedReceiver,
/// An error occurred when computing an aggregated primal.
Aggregation(MErr),
/// An error occurred during the subgradient extension.
SubgradientExtension(PErr),
/// An error has been raised by the underlying master problem solver.
Master(MErr),
}
impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
MErr: std::fmt::Display,
PErr: std::fmt::Display,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
use Error::*;
match self {
DisconnectedSender => writeln!(fmt, "Communication channel to master process has been disconnected"),
DisconnectedReceiver => writeln!(fmt, "Communication channel from master process has been disconnected"),
Aggregation(err) => writeln!(fmt, "Computation of primal aggregate failed: {}", err),
SubgradientExtension(err) => writeln!(fmt, "Subgradient extension failed: {}", err),
Master(err) => writeln!(fmt, "Error in master problem solver: {}", err),
}
}
}
impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
MErr: std::error::Error + 'static,
PErr: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
DisconnectedSender => None,
DisconnectedReceiver => None,
Aggregation(err) => Some(err),
SubgradientExtension(err) => Some(err),
Master(err) => Some(err),
}
}
}
impl<MErr, PErr, T> From<SendError<T>> for Error<MErr, PErr>
where
T: Send + 'static,
{
fn from(_err: SendError<T>) -> Error<MErr, PErr> {
Error::DisconnectedSender
}
}
impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
fn from(_err: RecvError) -> Error<MErr, PErr> {
Error::DisconnectedReceiver
}
}
/// Configuration information for setting up a master problem.
pub struct MasterConfig {
/// The number of subproblems.
pub num_subproblems: usize,
/// The number of variables.
pub num_vars: usize,
/// The lower bounds on the variables.
pub lower_bounds: Option<DVector>,
/// The lower bounds on the variables.
pub upper_bounds: Option<DVector>,
}
/// A task for the master problem.
enum MasterTask<Mn, PErr, M>
where
Mn: Minorant,
M: MasterProblem<Mn>,
{
/// Add new variables with bounds to the master problem.
AddVariables(Vec<(Real, Real)>, Box<SubgradientExtender<Mn, PErr>>),
/// Add a new minorant for a subfunction to the master problem.
AddMinorant(usize, Mn),
/// Move the center of the master problem in the given direction.
MoveCenter(Real, Arc<DVector>),
/// Start a new computation of the master problem.
Solve { center_value: Real },
/// Compress the bundle.
Compress,
/// Set the weight parameter of the master problem.
SetWeight { weight: Real },
/// Return the current aggregated primal.
GetAggregatedPrimal {
subproblem: usize,
tx: Sender<Result<Mn::Primal, M::Err>>,
},
}
/// The response send from a master process.
///
/// The response contains the evaluation results of the latest call to `solve`.
pub enum Response<MErr, PErr> {
/// A regular master result.
Result {
nxt_d: DVector,
nxt_mod: Real,
sgnorm: Real,
/// The number of internal iterations.
cnt_updates: usize,
},
/// An error occurred.
Error(Error<MErr, PErr>),
}
/// Convenient type for `Response`.
pub type MasterResponse<P, M> =
Response<<M as MasterProblem<<P as FirstOrderProblem>::Minorant>>::Err, <P as FirstOrderProblem>::Err>;
type ToMasterSender<P, M> = Sender<MasterTask<<P as FirstOrderProblem>::Minorant, <P as FirstOrderProblem>::Err, M>>;
type ToMasterReceiver<P, M> =
Receiver<MasterTask<<P as FirstOrderProblem>::Minorant, <P as FirstOrderProblem>::Err, M>>;
pub struct MasterProcess<P, M>
where
P: FirstOrderProblem,
M: MasterProblem<P::Minorant>,
{
/// The channel to transmit new tasks to the master problem.
tx: ToMasterSender<P, M>,
phantom: std::marker::PhantomData<M>,
}
impl<P, M> MasterProcess<P, M>
where
P: FirstOrderProblem,
P::Minorant: Send + 'static,
P::Err: Send + 'static,
M: MasterProblem<P::Minorant> + Send + 'static,
M::Err: Send + 'static,
{
pub fn start(master: M, master_config: MasterConfig, tx: ClientSender<P, M>, threadpool: &mut ThreadPool) -> Self {
// Create a pair of communication channels.
let (to_master_tx, to_master_rx) = channel();
// The the master process thread.
threadpool.execute(move || {
debug!("Master process started");
let mut from_master_tx = tx;
let mut sgext = None;
if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx, &mut sgext) {
#[allow(unused_must_use)]
{
from_master_tx.send(Message::Master(Response::Error(err)));
}
}
debug!("Master proces stopped");
});
MasterProcess {
tx: to_master_tx,
phantom: std::marker::PhantomData,
}
}
/// Add new variables to the master problem.
pub fn add_vars(
&mut self,
vars: Vec<(Real, Real)>,
sgext: Box<SubgradientExtender<P::Minorant, P::Err>>,
) -> Result<(), Error<M::Err, P::Err>>
where
P::Err: 'static,
{
Ok(self.tx.send(MasterTask::AddVariables(vars, sgext))?)
}
/// Add a new minorant to the master problem model.
///
/// This adds the specified `minorant` with associated `primal` data to the
/// model of subproblem `i`.
pub fn add_minorant(&mut self, i: usize, minorant: P::Minorant) -> Result<(), Error<M::Err, P::Err>> {
Ok(self.tx.send(MasterTask::AddMinorant(i, minorant))?)
}
/// Move the center of the master problem.
///
/// This moves the master problem's center in direction $\\alpha \\cdot d$.
pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>) -> Result<(), Error<M::Err, P::Err>> {
Ok(self.tx.send(MasterTask::MoveCenter(alpha, d))?)
}
/// Solve the master problem.
///
/// The current function value in the center `center_value`.
/// Once the master problem is solved the process will send a
/// [`Response`] message to the `tx` channel.
pub fn solve(&mut self, center_value: Real) -> Result<(), Error<M::Err, P::Err>> {
Ok(self.tx.send(MasterTask::Solve { center_value })?)
}
/// Compresses the model.
pub fn compress(&mut self) -> Result<(), Error<M::Err, P::Err>> {
Ok(self.tx.send(MasterTask::Compress)?)
}
/// Sets the new weight of the proximal term in the master problem.
pub fn set_weight(&mut self, weight: Real) -> Result<(), Error<M::Err, P::Err>> {
Ok(self.tx.send(MasterTask::SetWeight { weight })?)
}
/// Get the current aggregated primal for a certain subproblem.
pub fn get_aggregated_primal(
&self,
subproblem: usize,
) -> Result<<P::Minorant as Minorant>::Primal, Error<M::Err, P::Err>> {
let (tx, rx) = channel();
self.tx.send(MasterTask::GetAggregatedPrimal { subproblem, tx })?;
rx.recv()?.map_err(Error::Aggregation)
}
/// The main loop of the master process.
fn master_main(
master: M,
master_config: MasterConfig,
tx: &mut ClientSender<P, M>,
rx: ToMasterReceiver<P, M>,
subgradient_extender: &mut Option<Box<SubgradientExtender<P::Minorant, P::Err>>>,
) -> Result<(), Error<M::Err, P::Err>> {
let mut master = master;
// Initialize the master problem.
master
.set_num_subproblems(master_config.num_subproblems)
.map_err(Error::Master)?;
master
.set_vars(
master_config.num_vars,
master_config.lower_bounds,
master_config.upper_bounds,
)
.map_err(Error::Master)?;
// The main iteration: wait for new tasks.
for m in rx {
match m {
MasterTask::AddVariables(vars, mut sgext) => {
debug!("master: add {} variables to the subproblem", vars.len());
master.add_vars(&vars, &mut sgext).map_err(|err| match err {
Either::Left(err) => Error::Master(err),
Either::Right(err) => Error::SubgradientExtension(err),
})?;
*subgradient_extender = Some(sgext);
}
MasterTask::AddMinorant(i, mut m) => {
debug!("master: add minorant to subproblem {}", i);
// It may happen the number new minorant belongs to an earlier evaluation
// with less variables (i.e. new variables have been added
// after the start of the evaluation but before the new
// minorant is added, i.e. now). In this case we must add
// extend the minorant accordingly.
if m.dim() < master.num_variables() {
if let Some(ref mut sgext) = subgradient_extender {
sgext(i, &mut m).map_err(Error::<M::Err, P::Err>::SubgradientExtension)?;
}
}
master.add_minorant(i, m).map_err(Error::Master)?;
}
MasterTask::MoveCenter(alpha, d) => {
debug!("master: move center");
master.move_center(alpha, &d);
}
MasterTask::Compress => {
debug!("Compress bundle");
master.compress().map_err(Error::Master)?;
}
MasterTask::Solve { center_value } => {
debug!("master: solve with center_value {}", center_value);
master.solve(center_value).map_err(Error::Master)?;
let master_response = Response::Result {
nxt_d: master.get_primopt(),
nxt_mod: master.get_primoptval(),
sgnorm: master.get_dualoptnorm2().sqrt(),
cnt_updates: master.cnt_updates(),
};
if let Err(err) = tx.send(Message::Master(master_response)) {
warn!("Master process cancelled because of channel error: {}", err);
break;
}
}
MasterTask::SetWeight { weight } => {
debug!("master: set weight {}", weight);
master.set_weight(weight).map_err(Error::Master)?;
}
MasterTask::GetAggregatedPrimal { subproblem, tx } => {
debug!("master: get aggregated primal for {}", subproblem);
if tx.send(master.aggregated_primal(subproblem)).is_err() {
warn!("Sending of aggregated primal for {} failed", subproblem);
}
}
};
}
Ok(())
}
}