RsBundle  Check-in [43abfc4c67]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:problem: make `ResultSender` a trait with implementation `ChannelSender`
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | result-sender
Files: files | file ages | folders
SHA1: 43abfc4c67ef4e097e0cecc3a7d04d60804056ff
User & Date: fifr 2019-11-22 09:24:06.348
Context
2019-11-22
12:00
Merge trunk check-in: 9198cc93a8 user: fifr tags: result-sender
09:24
problem: make `ResultSender` a trait with implementation `ChannelSender` check-in: 43abfc4c67 user: fifr tags: result-sender
09:01
problem: hide result channel in opaque type `ResultSender` check-in: 4d0fdfb346 user: fifr tags: result-sender
Changes
Unified Diff Ignore Whitespace Patch
Changes to examples/cflp.rs.
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
        self.pool = Some(pool)
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<I>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + Copy + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        self.pool.as_ref().unwrap().execute(move || {
            let res = if i < Nfac {







<
<
<
<
<
|

|







141
142
143
144
145
146
147





148
149
150
151
152
153
154
155
156
157
        self.pool = Some(pool)
    }

    fn stop(&mut self) {
        self.pool.take();
    }






    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        self.pool.as_ref().unwrap().execute(move || {
            let res = if i < Nfac {
Changes to examples/quadratic.rs.
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
        1
    }

    fn start(&mut self) {}

    fn stop(&mut self) {}

    fn evaluate<I>(
        &mut self,
        fidx: usize,
        x: Arc<DVector>,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + Copy + 'static,
    {
        let x = x.clone();
        let p = self.clone();
        thread::spawn(move || {
            assert_eq!(fidx, 0);
            let mut objective = p.c;
            let mut g = dvec![0.0; 2];







<
<
<
<
<
|

|







60
61
62
63
64
65
66





67
68
69
70
71
72
73
74
75
76
        1
    }

    fn start(&mut self) {}

    fn stop(&mut self) {}






    fn evaluate<S>(&mut self, fidx: usize, x: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
    {
        let x = x.clone();
        let p = self.clone();
        thread::spawn(move || {
            assert_eq!(fidx, 0);
            let mut objective = p.c;
            let mut g = dvec![0.0; 2];
Changes to src/mcf/problem.rs.
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<I>(&mut self, i: usize, y: Arc<DVector>, tx: ResultSender<I, Self::Primal, Self::Err>) -> Result<()>
    where
        I: Send + Copy + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        let sub = self.subs[i].clone();
        // Attention: `y` might be shorter than the set of active constraints







|

|







439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        let sub = self.subs[i].clone();
        // Attention: `y` might be shorter than the set of active constraints
Changes to src/problem.rs.
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous first-order oracle.

use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::{SendError, Sender};
use std::sync::Arc;

/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
///    1. The (exact) function value of a sub-function at some point.
///    2. A minorant of some sub-function.
#[derive(Debug)]
pub enum EvalResult<I, P> {
    /// The objective value at some point.
    ObjectiveValue { index: I, value: Real },
    /// A minorant with an associated primal.
    Minorant { index: I, minorant: Minorant, primal: P },
}

/// Channel to send evaluation results to.
//pub type ResultSender<I, P, E> = Sender<Result<EvalResult<I, P>, E>>;

pub struct ResultSender<I, P, E> {
    index: I,
    tx: Sender<Result<EvalResult<I, P>, E>>,
}

impl<I, P, E> ResultSender<I, P, E>
where
    I: Clone,
{
    pub fn new(index: I, tx: Sender<Result<EvalResult<I, P>, E>>) -> Self {
        ResultSender { index, tx }
    }

    pub fn objective(&self, value: Real) -> Result<(), SendError<Result<EvalResult<I, P>, E>>> {
        self.tx.send(Ok(EvalResult::ObjectiveValue {
            index: self.index.clone(),
            value,
        }))
    }

    pub fn minorant(&self, minorant: Minorant, primal: P) -> Result<(), SendError<Result<EvalResult<I, P>, E>>> {
        self.tx.send(Ok(EvalResult::Minorant {
            index: self.index.clone(),
            minorant,
            primal,
        }))
    }

    pub fn error(&self, err: E) -> Result<(), SendError<Result<EvalResult<I, P>, E>>> {
        self.tx.send(Err(err))
    }
}

/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.







|


<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

<
<
<
<
<
<
<
|

|

<
|
|
|
|
<
<
<
<
|
|
|
<
<
<
<
<
|
|
|
<
<







14
15
16
17
18
19
20
21
22
23
















24







25
26
27
28

29
30
31
32




33
34
35





36
37
38


39
40
41
42
43
44
45
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous first-order oracle.

use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;

















/// Channel to send evaluation results to.







pub trait ResultSender<P>: Send
where
    P: FirstOrderProblem,
{

    type Err: std::error::Error + Send;

    /// Send a new objective `value`.
    fn objective(&self, value: Real) -> Result<(), Self::Err>;





    /// Send a new `minorant` with associated `primal`.
    fn minorant(&self, minorant: Minorant, primal: P::Primal) -> Result<(), Self::Err>;






    /// Send an error message.
    fn error(&self, err: P::Err) -> Result<(), Self::Err>;


}

/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203

204
205
206
207
208
209
210

    /// Start the evaluation of the i^th subproblem at the given point.
    ///
    /// The results of the evaluation should be passed to the provided channel.
    /// In order to work correctly, the results must contain (an upper bound on)
    /// the objective value at $y$ as well as at least one subgradient centered
    /// at $y$ eventually.
    fn evaluate<I: Send + Copy + 'static>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + 'static;


    /// Called to update the problem.
    ///
    /// This method is called regularly by the solver. The problem should send problem update
    /// information (e.g. adding new variables) to the provided channel.
    ///
    /// The updates might be generated asynchronously.







<
<
<
<
<
|

|
>







154
155
156
157
158
159
160





161
162
163
164
165
166
167
168
169
170
171

    /// Start the evaluation of the i^th subproblem at the given point.
    ///
    /// The results of the evaluation should be passed to the provided channel.
    /// In order to work correctly, the results must contain (an upper bound on)
    /// the objective value at $y$ as well as at least one subgradient centered
    /// at $y$ eventually.





    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
        Self: Sized;

    /// Called to update the problem.
    ///
    /// This method is called regularly by the solver. The problem should send problem update
    /// information (e.g. adding new variables) to the provided channel.
    ///
    /// The updates might be generated asynchronously.
Changes to src/solver.rs.
19
20
21
22
23
24
25



pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};

pub mod asyn;

mod masterprocess;









>
>
19
20
21
22
23
24
25
26
27

pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};

pub mod asyn;

mod masterprocess;

mod channels;
Changes to src/solver/asyn.rs.
24
25
26
27
28
29
30

31
32
33
34
35
36
37
38
39
40
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};


use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{EvalResult, FirstOrderProblem, ResultSender, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.







>


|







24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};

use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
    T: Terminator<SolverData<P>> + Default,
    W: Weighter<SolverData<P>> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self







|







439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    T: Terminator<SolverData<P>> + Default,
    W: Weighter<SolverData<P>> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ResultSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();








|







559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();

876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ResultSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    /// Update the problem after descent or null `step`.
    ///







|







877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    /// Update the problem after descent or null `step`.
    ///
Added src/solver/channels.rs.




































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Implementation for `ResultSender` using channels.

use crate::problem::{FirstOrderProblem, ResultSender};
use crate::{Minorant, Real};

use crossbeam::channel::{SendError, Sender};

/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
///    1. The (exact) function value of a sub-function at some point.
///    2. A minorant of some sub-function.
#[derive(Debug)]
pub enum EvalResult<I, P> {
    /// The objective value at some point.
    ObjectiveValue { index: I, value: Real },
    /// A minorant with an associated primal.
    Minorant { index: I, minorant: Minorant, primal: P },
}

pub struct ChannelSender<I, P, E> {
    index: I,
    tx: Sender<Result<EvalResult<I, P>, E>>,
}

impl<I, P, E> ChannelSender<I, P, E> {
    pub fn new(index: I, tx: Sender<Result<EvalResult<I, P>, E>>) -> Self {
        ChannelSender { index, tx }
    }
}

impl<I, P> ResultSender<P> for ChannelSender<I, P::Primal, P::Err>
where
    I: Clone + Send,
    P: FirstOrderProblem,
    P::Err: Send,
{
    type Err = SendError<Result<EvalResult<I, P::Primal>, P::Err>>;

    fn objective(&self, value: Real) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Ok(EvalResult::ObjectiveValue {
            index: self.index.clone(),
            value,
        }))
    }

    fn minorant(
        &self,
        minorant: Minorant,
        primal: P::Primal,
    ) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Ok(EvalResult::Minorant {
            index: self.index.clone(),
            minorant,
            primal,
        }))
    }

    fn error(&self, err: P::Err) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Err(err))
    }
}
Changes to src/solver/sync.rs.
23
24
25
26
27
28
29

30
31
32
33
34
35
36
37
38
39
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};


use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{EvalResult, FirstOrderProblem, ResultSender, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.







>


|







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};

use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self







|







417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ResultSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();








|







537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();

769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ResultSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;







|







770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;