RsBundle  Diff

Differences From Artifact [e268746126]:

  • File src/solver/sync.rs — part of check-in [e12bd4bd5d] at 2020-07-18 18:49:23 on branch async — Merge trunk (user: fifr size: 29477) [more...]

To Artifact [288af33138]:

  • File src/solver/sync.rs — part of check-in [5ea4ec77c8] at 2020-07-20 19:32:43 on branch async — Merge trunk (user: fifr size: 29029) [more...]

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};

use log::{debug, info, warn};
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use threadpool::ThreadPool;

use crate::{DVector, Real};

use super::channels::{
    ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterConfig, MasterError, MasterProcess, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};







|





|







19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};

use log::{debug, info, warn};
use num_traits::{Float, Zero};
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use threadpool::ThreadPool;

use crate::{DVector, Minorant, Real};

use super::channels::{
    ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterConfig, MasterError, MasterProcess, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<
        <P as FirstOrderProblem>::Err,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Primal>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Primal,
        >>::Err,
    >,
>;

impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr>
where
    PErr: std::error::Error + 'static,







|
|







84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<
        <P as FirstOrderProblem>::Err,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Minorant>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Minorant,
        >>::Err,
    >,
>;

impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr>
where
    PErr: std::error::Error + 'static,
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
    P: FirstOrderProblem,
    M: MasterBuilder<P::Primal>,
    P::Err: 'static,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,







|







342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
    P: FirstOrderProblem,
    M: MasterBuilder<P::Minorant>,
    P::Err: 'static,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder<P::Primal>,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default())







|







396
397
398
399
400
401
402
403
404
405
406
407
408
409
410

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder<P::Minorant>,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default())
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
            }
        }
    }

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(&mut self, msg: EvalResult<usize, P::Primal, P::Err>) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
                self.data.nxt_ubs[index] = self.data.nxt_ubs[index].min(value);
            }
            EvalResult::Minorant { index, mut minorant } => {
                debug!("Receive minorant from subproblem {}", index);
                if self.data.nxt_cutvals[index].is_infinite() {
                    self.data.cnt_remaining_mins -= 1;
                }
                // move center of minorant to cur_y
                minorant.move_center(-1.0, &self.data.nxt_d);
                self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant);
                // add minorant to master problem
                master.add_minorant(index, minorant)?;
            }
            EvalResult::Done { .. } => return Ok(false), // nothing to do here
            EvalResult::Error { err, .. } => return Err(Error::Evaluation(err)),
        }








|
















|







603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
            }
        }
    }

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(&mut self, msg: EvalResult<usize, P::Minorant, P::Err>) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
                self.data.nxt_ubs[index] = self.data.nxt_ubs[index].min(value);
            }
            EvalResult::Minorant { index, mut minorant } => {
                debug!("Receive minorant from subproblem {}", index);
                if self.data.nxt_cutvals[index].is_infinite() {
                    self.data.cnt_remaining_mins -= 1;
                }
                // move center of minorant to cur_y
                minorant.move_center(-1.0, &self.data.nxt_d);
                self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant());
                // add minorant to master problem
                master.add_minorant(index, minorant)?;
            }
            EvalResult::Done { .. } => return Ok(false), // nothing to do here
            EvalResult::Error { err, .. } => return Err(Error::Evaluation(err)),
        }

801
802
803
804
805
806
807
808

809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828

829
830
831
832
833
834
835
836
837
838
839
840
841

        let mut have_update = false;
        for msg in update_rx {
            if let Message::Update(update) = msg {
                match update {
                    Update::AddVariables { bounds, sgext, .. } => {
                        have_update = true;
                        let mut newvars = Vec::with_capacity(bounds.len());

                        for (lower, upper) in bounds {
                            if lower > upper {
                                return Err(Error::InvalidBounds { lower, upper });
                            }
                            let value = if lower > 0.0 {
                                lower
                            } else if upper < 0.0 {
                                upper
                            } else {
                                0.0
                            };
                            //self.bounds.push((lower, upper));
                            newvars.push((None, lower - value, upper - value, value));
                        }
                        if !newvars.is_empty() {
                            // modify moved variables
                            for (index, val) in newvars.iter().filter_map(|v| v.0.map(|i| (i, v.3))) {
                                self.data.cur_y[index] = val;
                            }


                            // add new variables
                            self.data
                                .cur_y
                                .extend(newvars.iter().filter(|v| v.0.is_none()).map(|v| v.3));

                            master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1, v.2)).collect(), sgext)?;
                        }
                    }
                    Update::Done { .. } => (), // there's nothing to do
                    Update::Error { err, .. } => return Err(Error::Update(err)),
                }
            } else {
                unreachable!("Only update results allowed during update");







|
>
|
|
<
<
|
|
<
<
|
<
<
|
<
|
<
<
<
<
|
|
>

|
<
<

|







801
802
803
804
805
806
807
808
809
810
811


812
813


814


815

816




817
818
819
820
821


822
823
824
825
826
827
828
829
830

        let mut have_update = false;
        for msg in update_rx {
            if let Message::Update(update) = msg {
                match update {
                    Update::AddVariables { bounds, sgext, .. } => {
                        have_update = true;
                        let newvars = bounds
                            .into_iter()
                            .map(|(lower, upper)| {
                                if lower <= upper {


                                    let value = lower.max(Real::zero()).min(upper);
                                    Ok((lower - value, upper - value, value))


                                } else {


                                    Err(Error::InvalidBounds { lower, upper })

                                }




                            })
                            .collect::<Result<Vec<_>, P, M>>()?;
                        if !newvars.is_empty() {
                            // add new variables
                            self.data.cur_y.extend(newvars.iter().map(|v| v.2));



                            master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1)).collect(), sgext)?;
                        }
                    }
                    Update::Done { .. } => (), // there's nothing to do
                    Update::Error { err, .. } => return Err(Error::Update(err)),
                }
            } else {
                unreachable!("Only update results allowed during update");
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}







|







865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<<P::Minorant as Minorant>::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}