Index: examples/cflp.rs ================================================================== --- examples/cflp.rs +++ examples/cflp.rs @@ -29,11 +29,11 @@ use env_logger::{self, fmt::Color}; use ordered_float::NotNan; use threadpool::ThreadPool; -use bundle::problem::{EvalResult, FirstOrderProblem as ParallelProblem, ResultSender}; +use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender}; use bundle::solver::sync::{DefaultSolver, NoBundleSolver}; use bundle::{dvec, DVector, Minorant, Real}; const Nfac: usize = 3; const Ncus: usize = 5; @@ -143,19 +143,13 @@ fn stop(&mut self) { self.pool.take(); } - fn evaluate( - &mut self, - i: usize, - y: Arc, - index: I, - tx: ResultSender, - ) -> Result<(), Self::Err> + fn evaluate(&mut self, i: usize, y: Arc, tx: S) -> Result<(), Self::Err> where - I: Send + Copy + 'static, + S: ResultSender + 'static, { if self.pool.is_none() { self.start() } let y = y.clone(); @@ -165,25 +159,18 @@ } else { solve_customer_problem(i - Nfac, &y) }; match res { Ok((value, minorant, primal)) => { - if tx.send(Ok(EvalResult::ObjectiveValue { index, value })).is_err() { + if tx.objective(value).is_err() { return; } - if tx - .send(Ok(EvalResult::Minorant { - index, - minorant, - primal, - })) - .is_err() - { + if tx.minorant(minorant, primal).is_err() { return; } } - Err(err) => if tx.send(Err(err)).is_err() {}, + Err(err) => if tx.error(err).is_err() {}, }; }); Ok(()) } } Index: examples/quadratic.rs ================================================================== --- examples/quadratic.rs +++ examples/quadratic.rs @@ -25,11 +25,11 @@ use rustop::opts; use std::io::Write; use std::sync::Arc; use std::thread; -use bundle::problem::{EvalResult, FirstOrderProblem as ParallelProblem, ResultSender}; +use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender}; use bundle::solver::sync::{DefaultSolver, NoBundleSolver}; use bundle::{DVector, Minorant, Real}; #[derive(Clone)] struct QuadraticProblem { @@ -62,19 +62,13 @@ fn start(&mut self) {} fn stop(&mut self) {} - fn evaluate( - &mut self, - fidx: usize, - x: Arc, - index: I, - tx: ResultSender, - ) -> Result<(), Self::Err> + fn evaluate(&mut self, fidx: usize, x: Arc, tx: S) -> Result<(), Self::Err> where - I: Send + Copy + 'static, + S: ResultSender + 'static, { let x = x.clone(); let p = self.clone(); thread::spawn(move || { assert_eq!(fidx, 0); @@ -89,23 +83,18 @@ debug!("Evaluation at {:?}", x); debug!(" objective={}", objective); debug!(" subgradient={}", g); - tx.send(Ok(EvalResult::ObjectiveValue { - index, - value: objective, - })) - .unwrap(); - tx.send(Ok(EvalResult::Minorant { - index, - minorant: Minorant { + tx.objective(objective).unwrap(); + tx.minorant( + Minorant { constant: objective, linear: g, }, - primal: (), - })) + (), + ) .unwrap(); }); Ok(()) } } Index: src/mcf/problem.rs ================================================================== --- src/mcf/problem.rs +++ src/mcf/problem.rs @@ -14,11 +14,11 @@ // along with this program. If not, see // use crate::mcf; use crate::problem::{ - EvalResult, FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender, + FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender, UpdateState as ParallelUpdateState, }; use crate::{DVector, Minorant, Real}; use itertools::izip; @@ -441,19 +441,13 @@ fn stop(&mut self) { self.pool.take(); } - fn evaluate( - &mut self, - i: usize, - y: Arc, - index: I, - tx: ResultSender, - ) -> Result<()> + fn evaluate(&mut self, i: usize, y: Arc, tx: S) -> Result<()> where - I: Send + Copy + 'static, + S: ResultSender + 'static, { if self.pool.is_none() { self.start() } let y = y.clone(); @@ -465,26 +459,21 @@ self.pool .as_ref() .unwrap() .execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) { Ok((objective, subg, primal)) => { - tx.send(Ok(EvalResult::ObjectiveValue { - index, - value: objective, - })) - .unwrap(); - tx.send(Ok(EvalResult::Minorant { - index, - minorant: Minorant { + tx.objective(objective).unwrap(); + tx.minorant( + Minorant { constant: objective, linear: subg, }, primal, - })) + ) .unwrap(); } - Err(err) => tx.send(Err(err)).unwrap(), + Err(err) => tx.error(err).unwrap(), }); Ok(()) } fn update(&mut self, state: U, index: I, tx: UpdateSender) -> Result<()> Index: src/problem.rs ================================================================== --- src/problem.rs +++ src/problem.rs @@ -19,28 +19,26 @@ use crate::{Aggregatable, DVector, Minorant, Real}; use crossbeam::channel::Sender; use std::sync::Arc; -/// Evaluation result. -/// -/// The result of an evaluation is new information to be made -/// available to the solver and the master problem. There are -/// essentially two types of information: -/// -/// 1. The (exact) function value of a sub-function at some point. -/// 2. A minorant of some sub-function. -#[derive(Debug)] -pub enum EvalResult { - /// The objective value at some point. - ObjectiveValue { index: I, value: Real }, - /// A minorant with an associated primal. - Minorant { index: I, minorant: Minorant, primal: P }, -} - /// Channel to send evaluation results to. -pub type ResultSender = Sender, E>>; +pub trait ResultSender

: Send +where + P: FirstOrderProblem, +{ + type Err: std::error::Error + Send; + + /// Send a new objective `value`. + fn objective(&self, value: Real) -> Result<(), Self::Err>; + + /// Send a new `minorant` with associated `primal`. + fn minorant(&self, minorant: Minorant, primal: P::Primal) -> Result<(), Self::Err>; + + /// Send an error message. + fn error(&self, err: P::Err) -> Result<(), Self::Err>; +} /// Problem update information. /// /// The solver calls the `update` method of the problem regularly. /// This method can modify the problem by adding (or moving) @@ -158,19 +156,14 @@ /// /// The results of the evaluation should be passed to the provided channel. /// In order to work correctly, the results must contain (an upper bound on) /// the objective value at $y$ as well as at least one subgradient centered /// at $y$ eventually. - fn evaluate( - &mut self, - i: usize, - y: Arc, - index: I, - tx: ResultSender, - ) -> Result<(), Self::Err> + fn evaluate(&mut self, i: usize, y: Arc, tx: S) -> Result<(), Self::Err> where - I: Send + 'static; + S: ResultSender + 'static, + Self: Sized; /// Called to update the problem. /// /// This method is called regularly by the solver. The problem should send problem update /// information (e.g. adding new variables) to the provided channel. Index: src/solver.rs ================================================================== --- src/solver.rs +++ src/solver.rs @@ -21,5 +21,7 @@ pub use sync::{DefaultSolver, NoBundleSolver}; pub mod asyn; mod masterprocess; + +mod channels; Index: src/solver/asyn.rs ================================================================== --- src/solver/asyn.rs +++ src/solver/asyn.rs @@ -26,13 +26,14 @@ use std::time::Instant; use threadpool::ThreadPool; use crate::{DVector, Real}; +use super::channels::{ChannelSender, EvalResult}; use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse}; use crate::master::{Builder as MasterBuilder, MasterProblem}; -use crate::problem::{EvalResult, FirstOrderProblem, Update, UpdateState}; +use crate::problem::{FirstOrderProblem, Update, UpdateState}; use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator}; use crate::weighter::{HKWeightable, HKWeighter, Weighter}; /// The default iteration limit. pub const DEFAULT_ITERATION_LIMIT: usize = 10_000; @@ -440,11 +441,11 @@ } impl Solver where P: FirstOrderProblem, - P::Err: Into> + 'static, + P::Err: Into> + Send + 'static, T: Terminator> + Default, W: Weighter> + Default, M: MasterBuilder, ::MinorantIndex: std::hash::Hash, { @@ -560,11 +561,11 @@ debug!("Initial problem evaluation"); // We need an initial evaluation of all oracles for the first center. let y = Arc::new(self.data.cur_y.clone()); for i in 0..m { self.problem - .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap()) + .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap())) .map_err(Error::Evaluation)?; } debug!("Initialization complete"); @@ -878,11 +879,11 @@ // Start evaluation of all subproblems at the new candidate. let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?; for i in 0..self.problem.num_subproblems() { self.problem - .evaluate(i, self.data.nxt_y.clone(), i, client_tx.clone()) + .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone())) .map_err(Error::Evaluation)?; } Ok(false) } ADDED src/solver/channels.rs Index: src/solver/channels.rs ================================================================== --- /dev/null +++ src/solver/channels.rs @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 Frank Fischer + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + */ + +//! Implementation for `ResultSender` using channels. + +use crate::problem::{FirstOrderProblem, ResultSender}; +use crate::{Minorant, Real}; + +use crossbeam::channel::{SendError, Sender}; + +/// Evaluation result. +/// +/// The result of an evaluation is new information to be made +/// available to the solver and the master problem. There are +/// essentially two types of information: +/// +/// 1. The (exact) function value of a sub-function at some point. +/// 2. A minorant of some sub-function. +#[derive(Debug)] +pub enum EvalResult { + /// The objective value at some point. + ObjectiveValue { index: I, value: Real }, + /// A minorant with an associated primal. + Minorant { index: I, minorant: Minorant, primal: P }, +} + +pub struct ChannelSender { + index: I, + tx: Sender, E>>, +} + +impl ChannelSender { + pub fn new(index: I, tx: Sender, E>>) -> Self { + ChannelSender { index, tx } + } +} + +impl ResultSender

for ChannelSender +where + I: Clone + Send, + P: FirstOrderProblem, + P::Err: Send, +{ + type Err = SendError, P::Err>>; + + fn objective(&self, value: Real) -> Result<(), SendError, P::Err>>> { + self.tx.send(Ok(EvalResult::ObjectiveValue { + index: self.index.clone(), + value, + })) + } + + fn minorant( + &self, + minorant: Minorant, + primal: P::Primal, + ) -> Result<(), SendError, P::Err>>> { + self.tx.send(Ok(EvalResult::Minorant { + index: self.index.clone(), + minorant, + primal, + })) + } + + fn error(&self, err: P::Err) -> Result<(), SendError, P::Err>>> { + self.tx.send(Err(err)) + } +} Index: src/solver/sync.rs ================================================================== --- src/solver/sync.rs +++ src/solver/sync.rs @@ -25,13 +25,14 @@ use std::time::Instant; use threadpool::ThreadPool; use crate::{DVector, Real}; +use super::channels::{ChannelSender, EvalResult}; use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse}; use crate::master::{Builder as MasterBuilder, MasterProblem}; -use crate::problem::{EvalResult, FirstOrderProblem, Update, UpdateState}; +use crate::problem::{FirstOrderProblem, Update, UpdateState}; use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator}; use crate::weighter::{HKWeightable, HKWeighter, Weighter}; /// The default iteration limit. pub const DEFAULT_ITERATION_LIMIT: usize = 10_000; @@ -418,11 +419,11 @@ } impl Solver where P: FirstOrderProblem, - P::Err: Into> + 'static, + P::Err: Into> + Send + 'static, T: Terminator + Default, W: Weighter + Default, M: MasterBuilder, ::MinorantIndex: std::hash::Hash, { @@ -538,11 +539,11 @@ debug!("Initial problem evaluation"); // We need an initial evaluation of all oracles for the first center. let y = Arc::new(self.data.cur_y.clone()); for i in 0..m { self.problem - .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap()) + .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap())) .map_err(Error::Evaluation)?; } debug!("Initialization complete"); @@ -771,11 +772,11 @@ // Start evaluation of all subproblems at the new candidate. let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?; for i in 0..self.problem.num_subproblems() { self.problem - .evaluate(i, self.data.nxt_y.clone(), i, client_tx.clone()) + .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone())) .map_err(Error::Evaluation)?; } Ok(false) }