RsBundle  Artifact [2f9accaa92]

Artifact 2f9accaa92e1718cd76fcd78e538f67dd2de5677:

  • File src/solver/masterprocess.rs — part of check-in [62c311d2a4] at 2019-07-30 07:40:07 on branch restructure — Move `parallel` to `solver::sync` (user: fifr size: 9799) [more...]

/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Asynchronous process solving a master problem.

use crossbeam::channel::{unbounded as channel, Receiver, Sender};
use log::{debug, warn};
use std::sync::Arc;
use threadpool::ThreadPool;

use super::sync::Error;
use crate::master::primalmaster::PrimalMaster;
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{DVector, Minorant, Real};

/// Configuration information for setting up a master problem.
pub struct MasterConfig {
    /// The number of subproblems.
    pub num_subproblems: usize,
    /// The number of variables.
    pub num_vars: usize,
    /// The lower bounds on the variables.
    pub lower_bounds: Option<DVector>,
    /// The lower bounds on the variables.
    pub upper_bounds: Option<DVector>,
}

/// A task for the master problem.
enum MasterTask<Pr, PErr, M>
where
    M: MasterProblem,
{
    /// Add new variables to the master problem.
    AddVariables(Vec<(Option<usize>, Real, Real)>, Box<SubgradientExtender<Pr, PErr>>),

    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Minorant, Pr),

    /// Move the center of the master problem in the given direction.
    MoveCenter(Real, Arc<DVector>),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },

    /// Compress the bundle.
    Compress,

    /// Set the weight parameter of the master problem.
    SetWeight { weight: Real },

    /// Return the current aggregated primal.
    GetAggregatedPrimal {
        subproblem: usize,
        tx: Sender<Result<Pr, M::Err>>,
    },
}

/// The response send from a master process.
///
/// The response contains the evaluation results of the latest
pub struct MasterResponse {
    pub nxt_d: DVector,
    pub nxt_mod: Real,
    pub sgnorm: Real,
    /// The number of internal iterations.
    pub cnt_updates: usize,
}

type ToMasterSender<P, M> = Sender<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;

type ToMasterReceiver<P, M> = Receiver<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;

type MasterSender<E> = Sender<Result<MasterResponse, E>>;

pub type MasterReceiver<E> = Receiver<Result<MasterResponse, E>>;

pub struct MasterProcess<P, M>
where
    P: FirstOrderProblem,
    M: MasterProblem,
{
    /// The channel to transmit new tasks to the master problem.
    tx: ToMasterSender<P, M>,

    /// The channel to receive solutions from the master problem.
    pub rx: MasterReceiver<M::Err>,

    phantom: std::marker::PhantomData<M>,
}

impl<P, M> MasterProcess<P, M>
where
    P: FirstOrderProblem,
    P::Primal: Send + 'static,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
    M: MasterProblem + Send + 'static,
    M::MinorantIndex: std::hash::Hash,
    M::Err: Send + 'static,
{
    pub fn start(master: M, master_config: MasterConfig, threadpool: &mut ThreadPool) -> Self {
        // Create a pair of communication channels.
        let (to_master_tx, to_master_rx) = channel();
        let (from_master_tx, from_master_rx) = channel();

        // The the master process thread.
        threadpool.execute(move || {
            debug!("Master process started");
            let mut from_master_tx = from_master_tx;
            if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx) {
                #[allow(unused_must_use)]
                {
                    from_master_tx.send(Err(err));
                }
            }
            debug!("Master proces stopped");
        });

        MasterProcess {
            tx: to_master_tx,
            rx: from_master_rx,
            phantom: std::marker::PhantomData,
        }
    }

    /// Add new variables to the master problem.
    pub fn add_vars(
        &mut self,
        vars: Vec<(Option<usize>, Real, Real)>,
        sgext: Box<SubgradientExtender<P::Primal, P::Err>>,
    ) -> Result<(), Error<P::Err>>
    where
        P::Err: 'static,
    {
        self.tx
            .send(MasterTask::AddVariables(vars, sgext))
            .map_err(|err| Error::Process(err.into()))
    }

    /// Add a new minorant to the master problem model.
    ///
    /// This adds the specified `minorant` with associated `primal` data to the
    /// model of subproblem `i`.
    pub fn add_minorant(&mut self, i: usize, minorant: Minorant, primal: P::Primal) -> Result<(), Error<P::Err>> {
        self.tx
            .send(MasterTask::AddMinorant(i, minorant, primal))
            .map_err(|err| Error::Process(err.into()))
    }

    /// Move the center of the master problem.
    ///
    /// This moves the master problem's center in direction $\\alpha \\cdot d$.
    pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>) -> Result<(), Error<P::Err>> {
        self.tx
            .send(MasterTask::MoveCenter(alpha, d))
            .map_err(|err| Error::Process(err.into()))
    }

    /// Solve the master problem.
    ///
    /// The current function value in the center `center_value`.
    /// Once the master problem is solved the process will send a
    /// [`MasterResponse`] message to the `tx` channel.
    pub fn solve(&mut self, center_value: Real) -> Result<(), Error<P::Err>> {
        self.tx
            .send(MasterTask::Solve { center_value })
            .map_err(|err| Error::Process(err.into()))
    }

    /// Compresses the model.
    pub fn compress(&mut self) -> Result<(), Error<P::Err>> {
        self.tx
            .send(MasterTask::Compress)
            .map_err(|err| Error::Process(err.into()))
    }

    /// Sets the new weight of the proximal term in the master problem.
    pub fn set_weight(&mut self, weight: Real) -> Result<(), Error<P::Err>> {
        self.tx
            .send(MasterTask::SetWeight { weight })
            .map_err(|err| Error::Process(err.into()))
    }

    /// Get the current aggregated primal for a certain subproblem.
    pub fn get_aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, Error<P::Err>> {
        let (tx, rx) = channel();
        self.tx
            .send(MasterTask::GetAggregatedPrimal { subproblem, tx })
            .map_err(|err| Error::Process(err.into()))?;
        rx.recv()
            .map_err(|err| Error::Process(err.into()))?
            .map_err(|err| Error::Master(err.into()))
    }

    /// The main loop of the master process.
    fn master_main(
        master: M,
        master_config: MasterConfig,
        tx: &mut MasterSender<M::Err>,
        rx: ToMasterReceiver<P, M>,
    ) -> Result<(), M::Err> {
        let mut master = PrimalMaster::<_, P::Primal>::new(master);

        // Initialize the master problem.
        master.set_num_subproblems(master_config.num_subproblems)?;
        master.set_vars(
            master_config.num_vars,
            master_config.lower_bounds,
            master_config.upper_bounds,
        )?;

        // The main iteration: wait for new tasks.
        for m in rx {
            match m {
                MasterTask::AddVariables(vars, sgext) => {
                    debug!("master: add {} variables to the subproblem", vars.len());
                    master.add_vars(vars, sgext)?;
                }
                MasterTask::AddMinorant(i, m, primal) => {
                    debug!("master: add minorant to subproblem {}", i);
                    master.add_minorant(i, m, primal)?;
                }
                MasterTask::MoveCenter(alpha, d) => {
                    debug!("master: move center");
                    master.move_center(alpha, &d);
                }
                MasterTask::Compress => {
                    debug!("Compress bundle");
                    master.compress()?;
                }
                MasterTask::Solve { center_value } => {
                    debug!("master: solve with center_value {}", center_value);
                    master.solve(center_value)?;
                    let master_response = MasterResponse {
                        nxt_d: master.get_primopt(),
                        nxt_mod: master.get_primoptval(),
                        sgnorm: master.get_dualoptnorm2().sqrt(),
                        cnt_updates: master.cnt_updates(),
                    };
                    if let Err(err) = tx.send(Ok(master_response)) {
                        warn!("Master process cancelled because of channel error: {}", err);
                        break;
                    }
                }
                MasterTask::SetWeight { weight } => {
                    debug!("master: set weight {}", weight);
                    master.set_weight(weight)?;
                }
                MasterTask::GetAggregatedPrimal { subproblem, tx } => {
                    debug!("master: get aggregated primal for {}", subproblem);
                    if tx.send(master.aggregated_primal(subproblem)).is_err() {
                        warn!("Sending of aggregated primal for {} failed", subproblem);
                    };
                }
            };
        }

        Ok(())
    }
}