Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
| Comment: | problem: make `ResultSender` a trait with implementation `ChannelSender` |
|---|---|
| Downloads: | Tarball | ZIP archive |
| Timelines: | family | ancestors | descendants | both | result-sender |
| Files: | files | file ages | folders |
| SHA1: |
43abfc4c67ef4e097e0cecc3a7d04d60 |
| User & Date: | fifr 2019-11-22 09:24:06.348 |
Context
|
2019-11-22
| ||
| 12:00 | Merge trunk check-in: 9198cc93a8 user: fifr tags: result-sender | |
| 09:24 | problem: make `ResultSender` a trait with implementation `ChannelSender` check-in: 43abfc4c67 user: fifr tags: result-sender | |
| 09:01 | problem: hide result channel in opaque type `ResultSender` check-in: 4d0fdfb346 user: fifr tags: result-sender | |
Changes
Changes to examples/cflp.rs.
| ︙ | ︙ | |||
141 142 143 144 145 146 147 |
self.pool = Some(pool)
}
fn stop(&mut self) {
self.pool.take();
}
| < < < < < | | | 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
self.pool = Some(pool)
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
{
if self.pool.is_none() {
self.start()
}
let y = y.clone();
self.pool.as_ref().unwrap().execute(move || {
let res = if i < Nfac {
|
| ︙ | ︙ |
Changes to examples/quadratic.rs.
| ︙ | ︙ | |||
60 61 62 63 64 65 66 |
1
}
fn start(&mut self) {}
fn stop(&mut self) {}
| < < < < < | | | 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
1
}
fn start(&mut self) {}
fn stop(&mut self) {}
fn evaluate<S>(&mut self, fidx: usize, x: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
{
let x = x.clone();
let p = self.clone();
thread::spawn(move || {
assert_eq!(fidx, 0);
let mut objective = p.c;
let mut g = dvec![0.0; 2];
|
| ︙ | ︙ |
Changes to src/mcf/problem.rs.
| ︙ | ︙ | |||
439 440 441 442 443 444 445 |
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
| | | | 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 |
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
where
S: ResultSender<Self> + 'static,
{
if self.pool.is_none() {
self.start()
}
let y = y.clone();
let sub = self.subs[i].clone();
// Attention: `y` might be shorter than the set of active constraints
|
| ︙ | ︙ |
Changes to src/problem.rs.
| ︙ | ︙ | |||
14 15 16 17 18 19 20 |
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous first-order oracle.
use crate::{Aggregatable, DVector, Minorant, Real};
| | < < < < < < < < < < < < < < < < < < < < < < < | | < | | | | < < < < | | | < < < < < | | | < < | 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous first-order oracle.
use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;
/// Channel to send evaluation results to.
pub trait ResultSender<P>: Send
where
P: FirstOrderProblem,
{
type Err: std::error::Error + Send;
/// Send a new objective `value`.
fn objective(&self, value: Real) -> Result<(), Self::Err>;
/// Send a new `minorant` with associated `primal`.
fn minorant(&self, minorant: Minorant, primal: P::Primal) -> Result<(), Self::Err>;
/// Send an error message.
fn error(&self, err: P::Err) -> Result<(), Self::Err>;
}
/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.
|
| ︙ | ︙ | |||
189 190 191 192 193 194 195 |
/// Start the evaluation of the i^th subproblem at the given point.
///
/// The results of the evaluation should be passed to the provided channel.
/// In order to work correctly, the results must contain (an upper bound on)
/// the objective value at $y$ as well as at least one subgradient centered
/// at $y$ eventually.
| < < < < < | | > | 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
/// Start the evaluation of the i^th subproblem at the given point.
///
/// The results of the evaluation should be passed to the provided channel.
/// In order to work correctly, the results must contain (an upper bound on)
/// the objective value at $y$ as well as at least one subgradient centered
/// at $y$ eventually.
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
Self: Sized;
/// Called to update the problem.
///
/// This method is called regularly by the solver. The problem should send problem update
/// information (e.g. adding new variables) to the provided channel.
///
/// The updates might be generated asynchronously.
|
| ︙ | ︙ |
Changes to src/solver.rs.
| ︙ | ︙ | |||
19 20 21 22 23 24 25 |
pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};
pub mod asyn;
mod masterprocess;
| > > | 19 20 21 22 23 24 25 26 27 |
pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};
pub mod asyn;
mod masterprocess;
mod channels;
|
Changes to src/solver/asyn.rs.
| ︙ | ︙ | |||
24 25 26 27 28 29 30 31 32 |
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
| > | | 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
|
| ︙ | ︙ | |||
438 439 440 441 442 443 444 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
| | | 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
T: Terminator<SolverData<P>> + Default,
W: Weighter<SolverData<P>> + Default,
M: MasterBuilder,
<M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
|
| ︙ | ︙ | |||
558 559 560 561 562 563 564 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
| | | 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
.map_err(Error::Evaluation)?;
}
debug!("Initialization complete");
self.start_time = Instant::now();
|
| ︙ | ︙ | |||
876 877 878 879 880 881 882 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
| | | 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
.evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
.map_err(Error::Evaluation)?;
}
Ok(false)
}
/// Update the problem after descent or null `step`.
///
|
| ︙ | ︙ |
Added src/solver/channels.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
/*
* Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! Implementation for `ResultSender` using channels.
use crate::problem::{FirstOrderProblem, ResultSender};
use crate::{Minorant, Real};
use crossbeam::channel::{SendError, Sender};
/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
/// 1. The (exact) function value of a sub-function at some point.
/// 2. A minorant of some sub-function.
#[derive(Debug)]
pub enum EvalResult<I, P> {
/// The objective value at some point.
ObjectiveValue { index: I, value: Real },
/// A minorant with an associated primal.
Minorant { index: I, minorant: Minorant, primal: P },
}
pub struct ChannelSender<I, P, E> {
index: I,
tx: Sender<Result<EvalResult<I, P>, E>>,
}
impl<I, P, E> ChannelSender<I, P, E> {
pub fn new(index: I, tx: Sender<Result<EvalResult<I, P>, E>>) -> Self {
ChannelSender { index, tx }
}
}
impl<I, P> ResultSender<P> for ChannelSender<I, P::Primal, P::Err>
where
I: Clone + Send,
P: FirstOrderProblem,
P::Err: Send,
{
type Err = SendError<Result<EvalResult<I, P::Primal>, P::Err>>;
fn objective(&self, value: Real) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Ok(EvalResult::ObjectiveValue {
index: self.index.clone(),
value,
}))
}
fn minorant(
&self,
minorant: Minorant,
primal: P::Primal,
) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Ok(EvalResult::Minorant {
index: self.index.clone(),
minorant,
primal,
}))
}
fn error(&self, err: P::Err) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Err(err))
}
}
|
Changes to src/solver/sync.rs.
| ︙ | ︙ | |||
23 24 25 26 27 28 29 30 31 |
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
| > | | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
|
| ︙ | ︙ | |||
416 417 418 419 420 421 422 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
| | | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
T: Terminator<SolverData> + Default,
W: Weighter<SolverData> + Default,
M: MasterBuilder,
<M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
|
| ︙ | ︙ | |||
536 537 538 539 540 541 542 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
| | | 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
.map_err(Error::Evaluation)?;
}
debug!("Initialization complete");
self.start_time = Instant::now();
|
| ︙ | ︙ | |||
769 770 771 772 773 774 775 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
| | | 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
.evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
.map_err(Error::Evaluation)?;
}
Ok(false)
}
fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
|
| ︙ | ︙ |