RsBundle  Artifact [0aa151db8b]

Artifact 0aa151db8b65630b0064afdbcd8a6aa2cb5d4c6f:

  • File src/mcf/problem.rs — part of check-in [98ffc4729c] at 2019-07-25 14:24:26 on branch mmcf-separation — mmcf: remove debug output (user: fifr size: 19686)

// Copyright (c) 2016, 2017, 2019 Frank Fischer <frank-fischer@shadow-soft.de>
//
// This program is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf;
use crate::parallel::{
    EvalResult, FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender,
    UpdateState as ParallelUpdateState,
};
use crate::{Aggregatable, DVector, FirstOrderProblem, Minorant, Real, SimpleEvaluation, Update, UpdateState};

use itertools::izip;
use log::{debug, warn};
use num_traits::Float;
use threadpool::ThreadPool;

use std::f64::INFINITY;
use std::fmt;
use std::fs::File;
use std::io::Read;
use std::iter;
use std::result;
use std::sync::{Arc, RwLock};

/// An error in the mmcf file format.
#[derive(Debug)]
pub enum MMCFReadError {
    Format(String),
    Solver(mcf::solver::Error),
    Io(std::io::Error),
}

impl fmt::Display for MMCFReadError {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
        use MMCFReadError::*;
        match self {
            Format(msg) => write!(fmt, "Format error: {}", msg),
            Solver(err) => err.fmt(fmt),
            Io(err) => err.fmt(fmt),
        }
    }
}

impl std::error::Error for MMCFReadError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use MMCFReadError::*;
        match self {
            Solver(err) => Some(err),
            Io(err) => Some(err),
            _ => None,
        }
    }
}

impl From<mcf::solver::Error> for MMCFReadError {
    fn from(err: mcf::solver::Error) -> MMCFReadError {
        MMCFReadError::Solver(err)
    }
}

impl From<std::io::Error> for MMCFReadError {
    fn from(err: std::io::Error) -> MMCFReadError {
        MMCFReadError::Io(err)
    }
}

impl From<std::num::ParseIntError> for MMCFReadError {
    fn from(err: std::num::ParseIntError) -> MMCFReadError {
        MMCFReadError::Format(format!("Parse error: {}", err))
    }
}

impl From<std::num::ParseFloatError> for MMCFReadError {
    fn from(err: std::num::ParseFloatError) -> MMCFReadError {
        MMCFReadError::Format(format!("Parse error: {}", err))
    }
}

/// Type of errors of the MMCFProblem.
pub type Error = crate::mcf::solver::Error;

/// Result type of the MMCFProblem.
pub type Result<T> = result::Result<T, Error>;

#[derive(Clone, Copy, Debug)]
struct ArcInfo {
    arc: usize,
    src: usize,
    snk: usize,
}

#[derive(Clone, Copy, Debug)]
struct Elem {
    ind: usize,
    val: Real,
}

/// A single MMCF subproblem, i.e. one network.
struct Subproblem {
    net: mcf::Solver,
    lhs: Vec<Vec<Elem>>,
    /// The right-hand side ... might be empty.
    rhs: DVector,
    cbase: DVector,
    c: DVector,
}

unsafe impl Send for Subproblem {}
unsafe impl Sync for Subproblem {}

pub struct MMCFProblem {
    pub multimodel: bool,

    subs: Vec<Arc<RwLock<Subproblem>>>,
    active_constraints: Vec<usize>,
    inactive_constraints: Vec<usize>,
    pool: Option<ThreadPool>,
}

impl Subproblem {
    fn evaluate<I>(&mut self, y: &[Real], active: I) -> Result<(Real, DVector, DVector)>
    where
        I: IntoIterator<Item = usize>,
        I::IntoIter: Clone,
    {
        let active = active.into_iter();
        // lhs and rhs in correct order
        let lhs = &self.lhs;
        let lhs = active.clone().map(|i| &lhs[i]);

        // compute costs
        self.c.clear();
        self.c.extend(self.cbase.iter());
        for (y, lhs) in izip!(y, lhs.clone()) {
            for elem in lhs {
                self.c[elem.ind] += y * elem.val;
            }
        }

        debug!("y={:?}", y);
        debug!("c={}", self.c);
        self.net.set_objective(&self.c)?;

        // solve subproblem
        self.net.solve()?;
        debug!("obj={}", self.net.objective()?);

        // compute minorant
        let (objective, mut subg) = if self.rhs.len() > 0 {
            let rhs = active.clone().map(|i| self.rhs[i]);
            let rhsval = izip!(y, rhs.clone()).map(|(y, r)| y * r).sum::<Real>();
            (rhsval - self.net.objective()?, rhs.collect())
        } else {
            (-self.net.objective()?, dvec![0.0; y.len()])
        };

        let sol = self.net.get_solution()?;
        for (i, lhs) in lhs.enumerate() {
            subg[i] -= lhs.iter().map(|elem| elem.val * sol[elem.ind]).sum::<Real>();
        }

        Ok((objective, subg, sol))
    }

    fn evaluate_constraint(&self, primal: &DVector, cidx: usize) -> Real {
        let rhs = self.rhs.get(cidx).unwrap_or(&0.0);
        let lhs = self.lhs[cidx]
            .iter()
            .map(|elem| elem.val * primal[elem.ind])
            .sum::<Real>();
        rhs - lhs
    }
}

impl MMCFProblem {
    pub fn read_mnetgen(basename: &str) -> std::result::Result<MMCFProblem, MMCFReadError> {
        let mut buffer = String::new();
        {
            let mut f = File::open(&format!("{}.nod", basename))?;
            f.read_to_string(&mut buffer)?;
        }
        let fnod = buffer
            .split_whitespace()
            .map(|x| x.parse::<usize>().unwrap())
            .collect::<Vec<_>>();

        if fnod.len() != 4 {
            return Err(MMCFReadError::Format(format!(
                "Expected 4 numbers in {}.nod, but got {}",
                basename,
                fnod.len()
            )));
        }

        let ncom = fnod[0];
        let nnodes = fnod[1];
        let narcs = fnod[2];
        let ncaps = fnod[3];

        // read nodes
        let mut nets = Vec::with_capacity(ncom);
        for _ in 0..ncom {
            nets.push(mcf::Solver::new(nnodes)?)
        }
        {
            let mut f = File::open(&format!("{}.sup", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let node = data.next().unwrap().parse::<usize>()?;
            let com = data.next().unwrap().parse::<usize>()?;
            let supply = data.next().unwrap().parse::<Real>()?;
            nets[com - 1].set_balance(node - 1, supply)?;
        }

        // read arcs
        let mut arcmap = vec![vec![]; ncom];
        let mut cbase = vec![dvec![]; ncom];

        // lhs nonzeros
        let mut lhsidx = vec![vec![vec![]; ncom]; ncaps];
        {
            let mut f = File::open(&format!("{}.arc", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let arc = data.next().unwrap().parse::<usize>()? - 1;
            let src = data.next().unwrap().parse::<usize>()? - 1;
            let snk = data.next().unwrap().parse::<usize>()? - 1;
            let com = data.next().unwrap().parse::<usize>()? - 1;
            let cost = data.next().unwrap().parse::<Real>()?;
            let cap = data.next().unwrap().parse::<Real>()?;
            let mt = data.next().unwrap().parse::<isize>()? - 1;
            assert!(
                arc < narcs,
                format!("Wrong arc number (got: {}, expected in 1..{})", arc + 1, narcs)
            );
            // set internal coeff
            let coeff = arcmap[com].len();
            arcmap[com].push(ArcInfo {
                arc: arc + 1,
                src: src + 1,
                snk: snk + 1,
            });
            // add arc
            nets[com].add_arc(src, snk, cost, if cap < 0.0 { INFINITY } else { cap })?;
            // set objective
            cbase[com].push(cost); // + 1e-6 * coeff
                                   // add to mutual capacity constraint
            if mt >= 0 {
                lhsidx[mt as usize][com].push(coeff);
            }
        }

        // read rhs of coupling constraints
        {
            let mut f = File::open(&format!("{}.mut", basename))?;
            buffer.clear();
            f.read_to_string(&mut buffer)?;
        }
        let mut rhs = dvec![0.0; ncaps];
        for line in buffer.lines() {
            let mut data = line.split_whitespace();
            let mt = data.next().unwrap().parse::<usize>()? - 1;
            let cap = data.next().unwrap().parse::<Real>()?;
            rhs[mt] = cap;
        }

        // set lhs
        let mut lhs = vec![vec![vec![]; ncaps]; ncom];
        for fidx in 0..ncom {
            for i in 0..ncaps {
                lhs[fidx][i] = lhsidx[i][fidx].iter().map(|&j| Elem { ind: j, val: 1.0 }).collect();
            }
        }

        let subproblems = izip!(nets, cbase, lhs, iter::once(rhs).chain(iter::repeat(dvec![])))
            .map(|(net, cbase, lhs, rhs)| Subproblem {
                net,
                cbase,
                c: dvec![],
                lhs,
                rhs,
            })
            .map(RwLock::new)
            .map(Arc::new)
            .collect();

        Ok(MMCFProblem {
            multimodel: false,
            subs: subproblems,
            active_constraints: vec![],
            inactive_constraints: (0..ncaps).collect(),
            pool: None,
        })
    }

    /// Compute costs for a primal solution.
    pub fn get_primal_costs(&self, fidx: usize, primals: &[DVector]) -> Real {
        let sub = self.subs[fidx].read().unwrap();
        if self.multimodel {
            primals[0].iter().enumerate().map(|(i, x)| x * sub.cbase[i]).sum()
        } else {
            let mut sum = 0.0;
            for p in primals {
                for (i, x) in p.iter().enumerate() {
                    sum += x * sub.cbase[i];
                }
            }
            sum
        }
    }

    /// Aggregate primal vectors.
    pub fn aggregate_primals_ref(&self, primals: &[(Real, &Vec<DVector>)]) -> Vec<DVector> {
        let mut aggr = primals[0]
            .1
            .iter()
            .map(|x| {
                let mut r = dvec![];
                r.scal(primals[0].0, x);
                r
            })
            .collect::<Vec<_>>();

        for &(alpha, primal) in &primals[1..] {
            for (j, x) in primal.iter().enumerate() {
                aggr[j].add_scaled(alpha, x);
            }
        }

        aggr
    }
}

impl Aggregatable for Vec<DVector> {
    fn combine<I, A>(aggregates: I) -> Vec<DVector>
    where
        I: IntoIterator<Item = (Real, A)>,
        A: std::borrow::Borrow<Vec<DVector>>,
    {
        let mut x: Vec<DVector>;
        let mut it = aggregates.into_iter();
        if let Some((alpha, y)) = it.next() {
            x = y.borrow().iter().map(|yi| yi.scaled(alpha)).collect();
        } else {
            return vec![];
        }

        for (alpha, y) in it {
            for i in 0..x.len() {
                x[i].add_scaled(alpha, &y.borrow()[i]);
            }
        }

        x
    }
}

impl FirstOrderProblem for MMCFProblem {
    type Err = Error;

    type Primal = Vec<DVector>;

    type EvalResult = SimpleEvaluation<Vec<DVector>>;

    fn num_variables(&self) -> usize {
        self.active_constraints.len()
    }

    fn lower_bounds(&self) -> Option<Vec<Real>> {
        Some(vec![0.0; self.active_constraints.len()])
    }

    fn upper_bounds(&self) -> Option<Vec<Real>> {
        None
    }

    fn num_subproblems(&self) -> usize {
        if self.multimodel {
            self.subs.len()
        } else {
            1
        }
    }

    fn evaluate(&mut self, fidx: usize, y: &[Real], _nullstep_bound: Real, _relprec: Real) -> Result<Self::EvalResult> {
        let (objective, subg, sol) = if self.multimodel {
            let (objective, subg, sol) = self.subs[fidx]
                .write()
                .unwrap()
                .evaluate(y, self.active_constraints.iter().cloned())?;
            (objective, subg, vec![sol])
        } else {
            let mut objective = 0.0;
            let mut subg = dvec![0.0; y.len()];
            let mut sols = Vec::with_capacity(self.subs.len());
            for sub in &mut self.subs {
                let (obj, sg, sol) = sub
                    .write()
                    .unwrap()
                    .evaluate(y, self.active_constraints.iter().cloned())?;
                objective += obj;
                subg.add_scaled(1.0, &sg);
                sols.push(sol);
            }
            (objective, subg, sols)
        };
        Ok(SimpleEvaluation {
            objective,
            minorants: vec![(
                Minorant {
                    constant: objective,
                    linear: subg,
                },
                sol,
            )],
        })
    }

    fn update(&mut self, state: &UpdateState<Self::Primal>) -> Result<Vec<Update>> {
        let nold = self.active_constraints.len();
        let subs = &self.subs;

        // if state.step != Step::Descent && !self.active_constraints.is_empty() {
        //     return Ok(vec![]);
        // }

        let newconstraints = self
            .inactive_constraints
            .iter()
            .map(|&cidx| {
                subs.iter()
                    .enumerate()
                    .map(|(fidx, sub)| {
                        let primals = state.aggregated_primals(fidx);
                        let aggr = Aggregatable::combine(primals.into_iter().map(|(alpha, x)| (alpha, &x[0])));
                        sub.read().unwrap().evaluate_constraint(&aggr, cidx)
                    })
                    .sum::<Real>()
            })
            .enumerate()
            .filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
            .collect::<Vec<_>>();

        let inactive = &mut self.inactive_constraints;
        self.active_constraints
            .extend(newconstraints.into_iter().rev().map(|i| inactive.swap_remove(i)));

        // *** The following code needs `drain_filter`, which is experimental as
        // of rust 1.36 ***

        // self.active_constraints
        //     .extend(self.inactive_constraints.drain_filter(|&mut cidx| {
        //         subs.iter()
        //             .enumerate()
        //             .map(|(fidx, sub)| {
        //                 let primals = state.aggregated_primals(fidx);
        //                 let aggr = Aggregatable::combine(primals.into_iter().map(|(alpha, x)| (alpha, &x[0])));
        //                 sub.read().unwrap().evaluate_constraint(&aggr, cidx)
        //             })
        //             .sum::<Real>() < -1e-3
        //     }));

        Ok(vec![
            Update::AddVariable {
                lower: 0.0,
                upper: Real::infinity()
            };
            self.active_constraints.len() - nold
        ])
    }

    fn extend_subgradient(&mut self, fidx: usize, primal: &Self::Primal, inds: &[usize]) -> Result<Vec<Real>> {
        let sub = self.subs[fidx].read().unwrap();
        Ok(inds
            .iter()
            .map(|&i| sub.evaluate_constraint(&primal[0], self.active_constraints[i]))
            .collect())
    }
}

impl ParallelProblem for MMCFProblem {
    type Err = <Self as FirstOrderProblem>::Err;

    type Primal = DVector;

    fn num_variables(&self) -> usize {
        FirstOrderProblem::num_variables(self)
    }

    fn lower_bounds(&self) -> Option<Vec<Real>> {
        FirstOrderProblem::lower_bounds(self)
    }

    fn num_subproblems(&self) -> usize {
        self.subs.len()
    }

    fn start(&mut self) {
        let pool = ThreadPool::new(4);
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<I>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        index: I,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<()>
    where
        I: Send + Copy + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        let sub = self.subs[i].clone();
        let active_constraints = self.active_constraints.clone();
        self.pool
            .as_ref()
            .unwrap()
            .execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
                Ok((objective, subg, primal)) => {
                    tx.send(Ok(EvalResult::ObjectiveValue {
                        index,
                        value: objective,
                    }))
                    .unwrap();
                    tx.send(Ok(EvalResult::Minorant {
                        index,
                        minorant: Minorant {
                            constant: objective,
                            linear: subg,
                        },
                        primal,
                    }))
                    .unwrap();
                }
                Err(err) => tx.send(Err(err)).unwrap(),
            });
        Ok(())
    }

    fn update<I, U>(&mut self, state: &U, index: I, tx: UpdateSender<I, Self::Primal, Self::Err>) -> Result<()>
    where
        U: ParallelUpdateState<Self::Primal>,
    {
        let nold = self.active_constraints.len();
        let subs = &self.subs;

        let newconstraints = self
            .inactive_constraints
            .iter()
            .map(|&cidx| {
                subs.iter()
                    .enumerate()
                    .map(|(fidx, sub)| {
                        let primal = state.aggregated_primal(fidx);
                        sub.read().unwrap().evaluate_constraint(&primal, cidx)
                    })
                    .sum::<Real>()
            })
            .enumerate()
            .filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
            .collect::<Vec<_>>();

        let inactive = &mut self.inactive_constraints;
        self.active_constraints
            .extend(newconstraints.into_iter().rev().map(|i| inactive.swap_remove(i)));

        // *** The following code needs `drain_filter`, which is experimental as
        // of rust 1.36 ***

        // self.active_constraints
        //     .extend(self.inactive_constraints.drain_filter(|&mut cidx| {
        //         subs.iter()
        //             .enumerate()
        //             .map(|(fidx, sub)| {
        //                 let primal = state.aggregated_primal(fidx);
        //                 sub.read().unwrap().evaluate_constraint(&primal, cidx)
        //             })
        //             .sum::<Real>() < 1e-3
        //     }));

        let nnew = self.active_constraints.len() - nold;
        if nnew > 0 {
            let actives = self.active_constraints.clone();
            let subs = self.subs.clone();
            if let Err(err) = tx.send(Ok(ParallelUpdate::AddVariables {
                index,
                bounds: vec![(0.0, Real::infinity()); nnew],
                sgext: Box::new(move |fidx: usize, primal: &DVector, inds: &[usize]| {
                    let sub = subs[fidx].read().unwrap();
                    Ok(inds
                        .iter()
                        .map(|&i| sub.evaluate_constraint(primal, actives[i]))
                        .collect())
                }),
            })) {
                warn!("Error sending problem update: {}", err);
            }
        }

        Ok(())
    }
}