RsBundle  Artifact [97d322030c]

Artifact 97d322030c5e96fe1b069d2ee6877b2da75ddd9d:

  • File src/solver/masterprocess.rs — part of check-in [e12bd4bd5d] at 2020-07-18 18:49:23 on branch async — Merge trunk (user: fifr size: 13076) [more...]

/*
 * Copyright (c) 2019, 2020 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Asynchronous process solving a master problem.

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};

use either::Either;
use log::{debug, warn};
use std::sync::Arc;
use thiserror::Error;
use threadpool::ThreadPool;

use super::channels::{ClientSender, Message};
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{Aggregatable, DVector, Minorant, Real};

/// Convenient type alias for master process errors.
pub type Error<P, M> =
    MasterError<<P as FirstOrderProblem>::Err, <M as MasterProblem<<P as FirstOrderProblem>::Primal>>::Err>;

/// Result type for the master process.
pub type Result<T, P, M> = std::result::Result<T, Error<P, M>>;

#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MasterError<PErr, MErr>
where
    PErr: std::error::Error + 'static,
    MErr: std::error::Error + 'static,
{
    #[error("sending channel to master has been disconnected")]
    DisconnectedSender(#[source] Box<dyn std::error::Error + Send>),
    #[error("receiving channel from master has been disconnected")]
    DisconnectedReceiver(#[source] RecvError),
    #[error("subgradient extension failed")]
    SubgradientExtension(#[source] PErr),
    #[error("master problem error")]
    Master(#[source] MErr),
}

impl<T, PErr, MErr> From<SendError<T>> for MasterError<PErr, MErr>
where
    T: Send + 'static,
    PErr: std::error::Error + 'static,
    MErr: std::error::Error + 'static,
{
    fn from(err: SendError<T>) -> MasterError<PErr, MErr> {
        MasterError::DisconnectedSender(Box::new(err))
    }
}

impl<PErr, MErr> From<RecvError> for MasterError<PErr, MErr>
where
    PErr: std::error::Error + 'static,
    MErr: std::error::Error + 'static,
{
    fn from(err: RecvError) -> MasterError<PErr, MErr> {
        MasterError::DisconnectedReceiver(err)
    }
}

/// Configuration information for setting up a master problem.
pub struct MasterConfig {
    /// The number of subproblems.
    pub num_subproblems: usize,
    /// The number of variables.
    pub num_vars: usize,
    /// The lower bounds on the variables.
    pub lower_bounds: Option<DVector>,
    /// The lower bounds on the variables.
    pub upper_bounds: Option<DVector>,
}

/// A task for the master problem.
enum MasterTask<Pr, PErr, MErr>
where
    Pr: Aggregatable,
{
    /// Add new variables to the master problem.
    AddVariables(Vec<(Option<usize>, Real, Real)>, Box<SubgradientExtender<Pr, PErr>>),

    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Minorant<Pr>),

    /// Move the center of the master problem in the given direction.
    ///
    /// The third value is a unique identifier for the current center.
    MoveCenter(Real, Arc<DVector>, usize),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },

    /// Compress the bundle.
    Compress,

    /// Set the weight parameter of the master problem.
    SetWeight { weight: Real },

    /// Return the current aggregated primal.
    GetAggregatedPrimal {
        subproblem: usize,
        tx: Sender<std::result::Result<Pr, MErr>>,
    },
}

/// The response send from a master process.
///
/// The response contains the evaluation results of the latest call to `solve`.
pub enum Response<E> {
    /// A regular master result.
    Result {
        nxt_d: DVector,
        /// Aggregated model value.
        nxt_mod: Real,
        /// Submodel values.
        nxt_submods: Vec<Real>,
        /// Aggregated subgradient norm.
        sgnorm: Real,
        /// The number of internal iterations.
        cnt_updates: usize,
        /// The index of the center.
        center_index: usize,
    },
    /// An error occurred.
    Error(E),
}

/// Convenient type for `Response`.
pub type MasterResponse<P, M> = Response<Error<P, M>>;

type MasterTsk<P, M> = MasterTask<
    <P as FirstOrderProblem>::Primal,
    <P as FirstOrderProblem>::Err,
    <M as MasterProblem<<P as FirstOrderProblem>::Primal>>::Err,
>;

type ToMasterSender<P, M> = Sender<MasterTsk<P, M>>;

type ToMasterReceiver<P, M> = Receiver<MasterTsk<P, M>>;

pub struct MasterProcess<P, M>
where
    P: FirstOrderProblem,
    M: MasterProblem<P::Primal>,
{
    /// The channel to transmit new tasks to the master problem.
    tx: ToMasterSender<P, M>,

    phantom: std::marker::PhantomData<M>,
}

impl<P, M> MasterProcess<P, M>
where
    P: FirstOrderProblem,
    P::Primal: Send + 'static,
    P::Err: Send + 'static,
    M: MasterProblem<P::Primal> + Send + 'static,
    M::Err: Send + 'static,
{
    pub fn start<I>(
        master: M,
        master_config: MasterConfig,
        tx: ClientSender<I, P, M>,
        threadpool: &mut ThreadPool,
    ) -> Self
    where
        I: Send + 'static,
    {
        // Create a pair of communication channels.
        let (to_master_tx, to_master_rx) = channel();

        // The the master process thread.
        threadpool.execute(move || {
            debug!("Master process started");
            let mut from_master_tx = tx;
            let mut sgext = None;
            if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx, &mut sgext) {
                #[allow(unused_must_use)]
                {
                    from_master_tx.send(Message::Master(Response::Error(err)));
                }
            }
            debug!("Master proces stopped");
        });

        MasterProcess {
            tx: to_master_tx,
            phantom: std::marker::PhantomData,
        }
    }

    /// Add new variables to the master problem.
    pub fn add_vars(
        &mut self,
        vars: Vec<(Option<usize>, Real, Real)>,
        sgext: Box<SubgradientExtender<P::Primal, P::Err>>,
    ) -> Result<(), P, M>
    where
        P::Err: 'static,
    {
        Ok(self.tx.send(MasterTask::AddVariables(vars, sgext))?)
    }

    /// Add a new minorant to the master problem model.
    ///
    /// This adds the specified `minorant` with associated `primal` data to the
    /// model of subproblem `i`.
    pub fn add_minorant(&mut self, i: usize, minorant: Minorant<P::Primal>) -> Result<(), P, M> {
        Ok(self.tx.send(MasterTask::AddMinorant(i, minorant))?)
    }

    /// Move the center of the master problem.
    ///
    /// This moves the master problem's center in direction $\\alpha \\cdot d$.
    pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>, center_index: usize) -> Result<(), P, M> {
        Ok(self.tx.send(MasterTask::MoveCenter(alpha, d, center_index))?)
    }

    /// Solve the master problem.
    ///
    /// The current function value in the center `center_value`.
    /// Once the master problem is solved the process will send a
    /// [`Response`] message to the `tx` channel.
    pub fn solve(&mut self, center_value: Real) -> Result<(), P, M> {
        Ok(self.tx.send(MasterTask::Solve { center_value })?)
    }

    /// Compresses the model.
    pub fn compress(&mut self) -> Result<(), P, M> {
        Ok(self.tx.send(MasterTask::Compress)?)
    }

    /// Sets the new weight of the proximal term in the master problem.
    pub fn set_weight(&mut self, weight: Real) -> Result<(), P, M> {
        Ok(self.tx.send(MasterTask::SetWeight { weight })?)
    }

    /// Get the current aggregated primal for a certain subproblem.
    pub fn get_aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
        let (tx, rx) = channel();
        self.tx.send(MasterTask::GetAggregatedPrimal { subproblem, tx })?;
        rx.recv()?.map_err(MasterError::Master)
    }

    /// The main loop of the master process.
    fn master_main<I>(
        master: M,
        master_config: MasterConfig,
        tx: &mut ClientSender<I, P, M>,
        rx: ToMasterReceiver<P, M>,
        subgradient_extender: &mut Option<Box<SubgradientExtender<P::Primal, P::Err>>>,
    ) -> Result<(), P, M> {
        let mut master = master;
        let mut center_idx = 0;

        // Initialize the master problem.
        master
            .set_num_subproblems(master_config.num_subproblems)
            .map_err(MasterError::Master)?;
        master
            .set_vars(
                master_config.num_vars,
                master_config.lower_bounds,
                master_config.upper_bounds,
            )
            .map_err(MasterError::Master)?;

        // The main iteration: wait for new tasks.
        for m in rx {
            match m {
                MasterTask::AddVariables(vars, mut sgext) => {
                    debug!("master: add {} variables to the subproblem", vars.len());
                    master.add_vars(&vars, &mut sgext).map_err(|err| match err {
                        Either::Left(err) => MasterError::Master(err),
                        Either::Right(err) => MasterError::SubgradientExtension(err),
                    })?;
                    *subgradient_extender = Some(sgext);
                }
                MasterTask::AddMinorant(i, mut m) => {
                    debug!("master: add minorant to subproblem {}", i);
                    // It may happen the number new minorant belongs to an earlier evaluation
                    // with less variables (i.e. new variables have been added
                    // after the start of the evaluation but before the new
                    // minorant is added, i.e. now). In this case we must add
                    // extend the minorant accordingly.
                    if m.linear.len() < master.num_variables() {
                        if let Some(ref mut sgext) = subgradient_extender {
                            let newinds = (m.linear.len()..master.num_variables()).collect::<Vec<_>>();
                            let new_subg =
                                sgext(i, &m.primal, &newinds).map_err(Error::<P, M>::SubgradientExtension)?;
                            m.linear.extend(new_subg);
                        }
                    }
                    master.add_minorant(i, m).map_err(MasterError::Master)?;
                }
                MasterTask::MoveCenter(alpha, d, center_index) => {
                    debug!("master: move center to {}", center_index);
                    master.move_center(alpha, &d);
                    center_idx = center_index;
                }
                MasterTask::Compress => {
                    debug!("master: compress bundle");
                    master.compress().map_err(MasterError::Master)?;
                }
                MasterTask::Solve { center_value } => {
                    debug!("master: solve with center_value {}", center_value);
                    master.fill_models(|_, _| {}).map_err(MasterError::Master)?;
                    master.solve(center_value).map_err(MasterError::Master)?;
                    let master_response = Response::Result {
                        nxt_d: master.get_primopt(),
                        nxt_mod: master.get_primoptval(),
                        nxt_submods: (0..master.num_subproblems())
                            .map(|i| master.get_optvalue_subproblem(i))
                            .collect(),
                        sgnorm: master.get_dualoptnorm2().sqrt(),
                        cnt_updates: master.cnt_updates(),
                        center_index: center_idx,
                    };
                    if let Err(err) = tx.send(Message::Master(master_response)) {
                        warn!("Master process cancelled because of channel error: {}", err);
                        break;
                    }
                }
                MasterTask::SetWeight { weight } => {
                    debug!("master: set weight {}", weight);
                    master.set_weight(weight).map_err(MasterError::Master)?;
                }
                MasterTask::GetAggregatedPrimal { subproblem, tx } => {
                    debug!("master: get aggregated primal for {}", subproblem);
                    if tx.send(master.aggregated_primal(subproblem)).is_err() {
                        warn!("Sending of aggregated primal for {} failed", subproblem);
                    }
                }
            };
        }

        Ok(())
    }
}