// Copyright (c) 2016, 2017, 2019 Frank Fischer <frank-fischer@shadow-soft.de>
//
// This program is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>
//
use crate::mcf;
use crate::problem::{
EvalResult, FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender,
UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};
use itertools::izip;
use log::{debug, warn};
use num_traits::Float;
use threadpool::ThreadPool;
use std::f64::INFINITY;
use std::fmt;
use std::fs::File;
use std::io::Read;
use std::iter;
use std::result;
use std::sync::{Arc, RwLock};
/// An error in the mmcf file format.
#[derive(Debug)]
pub enum MMCFReadError {
Format(String),
MissingNodField(&'static str),
MissingSupField(&'static str),
MissingArcField(&'static str),
MissingMutField(&'static str),
ExtraNodField,
ExtraSupField,
ExtraArcField,
ExtraMutField,
InvalidNode(usize, usize),
InvalidArc(usize, usize),
InvalidCom(usize, usize),
Solver(mcf::solver::Error),
Io(std::io::Error),
}
impl fmt::Display for MMCFReadError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use MMCFReadError::*;
match self {
Format(msg) => write!(fmt, "Format error: {}", msg),
MissingNodField(what) => write!(fmt, "Missing value '{}' in line in node file", what),
MissingSupField(what) => write!(fmt, "Missing value '{}' in line in supply file", what),
MissingArcField(what) => write!(fmt, "Missing value '{}' in line in arc file", what),
MissingMutField(what) => write!(fmt, "Missing value '{}' in line in mut file", what),
ExtraNodField => write!(fmt, "Extra field at the end of line in node file"),
ExtraSupField => write!(fmt, "Extra field at the end of line in supply file"),
ExtraArcField => write!(fmt, "Extra field at the end of line in arc file"),
ExtraMutField => write!(fmt, "Extra field at the end of line in mut file"),
InvalidNode(u, n) => write!(fmt, "Invalid node number {} (must be in 1..{})", u, n),
InvalidArc(u, n) => write!(fmt, "Invalid arc number {} (must be in 1..{})", u, n),
InvalidCom(u, n) => write!(fmt, "Invalid com number {} (must be in 1..{})", u, n),
Solver(err) => err.fmt(fmt),
Io(err) => err.fmt(fmt),
}
}
}
impl std::error::Error for MMCFReadError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use MMCFReadError::*;
match self {
Solver(err) => Some(err),
Io(err) => Some(err),
_ => None,
}
}
}
impl From<mcf::solver::Error> for MMCFReadError {
fn from(err: mcf::solver::Error) -> MMCFReadError {
MMCFReadError::Solver(err)
}
}
impl From<std::io::Error> for MMCFReadError {
fn from(err: std::io::Error) -> MMCFReadError {
MMCFReadError::Io(err)
}
}
impl From<std::num::ParseIntError> for MMCFReadError {
fn from(err: std::num::ParseIntError) -> MMCFReadError {
MMCFReadError::Format(format!("Parse error: {}", err))
}
}
impl From<std::num::ParseFloatError> for MMCFReadError {
fn from(err: std::num::ParseFloatError) -> MMCFReadError {
MMCFReadError::Format(format!("Parse error: {}", err))
}
}
/// Type of errors of the MMCFProblem.
pub type Error = crate::mcf::solver::Error;
/// Result type of the MMCFProblem.
pub type Result<T> = result::Result<T, Error>;
#[derive(Clone, Copy, Debug)]
struct ArcInfo {
arc: usize,
src: usize,
snk: usize,
}
#[derive(Clone, Copy, Debug)]
struct Elem {
ind: usize,
val: Real,
}
/// A single MMCF subproblem, i.e. one network.
struct Subproblem {
net: mcf::Solver,
lhs: Vec<Vec<Elem>>,
/// The right-hand side ... might be empty.
rhs: DVector,
cbase: DVector,
c: DVector,
}
unsafe impl Send for Subproblem {}
unsafe impl Sync for Subproblem {}
pub struct MMCFProblem {
pub multimodel: bool,
subs: Vec<Arc<RwLock<Subproblem>>>,
active_constraints: Arc<RwLock<Vec<usize>>>,
inactive_constraints: Arc<RwLock<Vec<usize>>>,
pool: Option<ThreadPool>,
}
impl Subproblem {
fn evaluate<I>(&mut self, y: &[Real], active: I) -> Result<(Real, DVector, DVector)>
where
I: IntoIterator<Item = usize>,
I::IntoIter: Clone,
{
let active = active.into_iter();
// lhs and rhs in correct order
let lhs = &self.lhs;
let lhs = active.clone().map(|i| &lhs[i]);
// compute costs
self.c.clear();
self.c.extend(self.cbase.iter());
for (y, lhs) in izip!(y, lhs.clone()) {
for elem in lhs {
self.c[elem.ind] += y * elem.val;
}
}
debug!("y={:?}", y);
debug!("c={}", self.c);
self.net.set_objective(&self.c)?;
// solve subproblem
self.net.solve()?;
debug!("obj={}", self.net.objective()?);
// compute minorant
let (objective, mut subg) = if self.rhs.len() > 0 {
let rhs = active.clone().map(|i| self.rhs[i]);
let rhsval = izip!(y, rhs.clone()).map(|(y, r)| y * r).sum::<Real>();
(rhsval - self.net.objective()?, rhs.collect())
} else {
(-self.net.objective()?, dvec![0.0; y.len()])
};
let sol = self.net.get_solution()?;
for (i, lhs) in lhs.enumerate() {
subg[i] -= lhs.iter().map(|elem| elem.val * sol[elem.ind]).sum::<Real>();
}
Ok((objective, subg, sol))
}
fn evaluate_constraint(&self, primal: &DVector, cidx: usize) -> Real {
let rhs = self.rhs.get(cidx).unwrap_or(&0.0);
let lhs = self.lhs[cidx]
.iter()
.map(|elem| elem.val * primal[elem.ind])
.sum::<Real>();
rhs - lhs
}
}
impl MMCFProblem {
pub fn num_subproblems(&self) -> usize {
self.subs.len()
}
pub fn read_mnetgen(basename: &str) -> std::result::Result<MMCFProblem, MMCFReadError> {
use MMCFReadError::*;
let mut buffer = String::new();
{
let mut f = File::open(&format!("{}.nod", basename))?;
f.read_to_string(&mut buffer)?;
}
let ncom;
let nnodes;
let narcs;
let ncaps;
{
let mut data = buffer.split_whitespace();
ncom = data.next().ok_or_else(|| MissingNodField("ncom"))?.parse::<usize>()?;
nnodes = data.next().ok_or_else(|| MissingNodField("nnodes"))?.parse::<usize>()?;
narcs = data.next().ok_or_else(|| MissingNodField("narcs"))?.parse::<usize>()?;
ncaps = data.next().ok_or_else(|| MissingNodField("ncaps"))?.parse::<usize>()?;
if data.next().is_some() {
return Err(ExtraNodField);
}
}
// read nodes
let mut nets = Vec::with_capacity(ncom);
for _ in 0..ncom {
nets.push(mcf::Solver::new(nnodes)?)
}
{
let mut f = File::open(&format!("{}.sup", basename))?;
buffer.clear();
f.read_to_string(&mut buffer)?;
}
for line in buffer.lines() {
let mut data = line.split_whitespace();
let node = data.next().ok_or_else(|| MissingSupField("node"))?.parse::<usize>()?;
let com = data.next().ok_or_else(|| MissingSupField("com"))?.parse::<usize>()?;
let supply = data.next().ok_or_else(|| MissingSupField("supply"))?.parse::<Real>()?;
if data.next().is_some() {
return Err(ExtraSupField);
}
nets[com - 1].set_balance(node - 1, supply)?;
}
// read arcs
let mut arcmap = vec![vec![]; ncom];
let mut cbase = vec![dvec![]; ncom];
// lhs nonzeros
let mut lhsidx = vec![vec![vec![]; ncom]; ncaps];
{
let mut f = File::open(&format!("{}.arc", basename))?;
buffer.clear();
f.read_to_string(&mut buffer)?;
}
for line in buffer.lines() {
let mut data = line.split_whitespace();
let arc = data.next().ok_or_else(|| MissingArcField("arc"))?.parse::<usize>()?;
let src = data.next().ok_or_else(|| MissingArcField("src"))?.parse::<usize>()?;
let snk = data.next().ok_or_else(|| MissingArcField("snk"))?.parse::<usize>()?;
let com = data.next().ok_or_else(|| MissingArcField("com"))?.parse::<usize>()?;
let cost = data.next().ok_or_else(|| MissingArcField("cost"))?.parse::<Real>()?;
let cap = data.next().ok_or_else(|| MissingArcField("cap"))?.parse::<Real>()?;
let mt = data.next().ok_or_else(|| MissingArcField("mt"))?.parse::<isize>()?;
if data.next().is_some() {
return Err(ExtraArcField);
}
if arc < 1 || arc > narcs {
return Err(InvalidArc(arc, narcs));
}
if src < 1 || src > nnodes {
return Err(InvalidNode(src, nnodes));
}
if snk < 1 || snk > nnodes {
return Err(InvalidNode(snk, nnodes));
}
if com < 1 || com > ncom {
return Err(InvalidCom(com, ncom));
}
let com = com - 1;
// set internal coeff
let coeff = arcmap[com].len();
arcmap[com].push(ArcInfo { arc, src, snk });
// add arc
nets[com].add_arc(src - 1, snk - 1, cost, if cap < 0.0 { INFINITY } else { cap })?;
// set objective
cbase[com].push(cost); // + 1e-6 * coeff
// add to mutual capacity constraint
if mt > 0 {
lhsidx[(mt - 1) as usize][com].push(coeff);
}
}
// read rhs of coupling constraints
{
let mut f = File::open(&format!("{}.mut", basename))?;
buffer.clear();
f.read_to_string(&mut buffer)?;
}
let mut rhs = dvec![0.0; ncaps];
for line in buffer.lines() {
let mut data = line.split_whitespace();
let mt = data.next().ok_or_else(|| MissingMutField("mt"))?.parse::<usize>()? - 1;
let cap = data.next().ok_or_else(|| MissingMutField("cap"))?.parse::<Real>()?;
if data.next().is_some() {
return Err(ExtraMutField);
}
rhs[mt] = cap;
}
// set lhs
let lhs = (0..ncom)
.map(|fidx| {
(0..ncaps)
.map(|i| lhsidx[i][fidx].iter().map(|&j| Elem { ind: j, val: 1.0 }).collect())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let subproblems = izip!(nets, cbase, lhs, iter::once(rhs).chain(iter::repeat(dvec![])))
.map(|(net, cbase, lhs, rhs)| Subproblem {
net,
cbase,
c: dvec![],
lhs,
rhs,
})
.map(RwLock::new)
.map(Arc::new)
.collect();
Ok(MMCFProblem {
multimodel: false,
subs: subproblems,
active_constraints: Arc::new(RwLock::new((0..ncaps).collect())),
inactive_constraints: Arc::new(RwLock::new(vec![])),
pool: None,
})
}
/// Mark all constraints as being separated.
pub fn set_separate_constraints(&mut self, separate: bool) {
let nconstrs = self.subs[0].read().unwrap().lhs.len();
if separate {
self.active_constraints.write().unwrap().clear();
self.inactive_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
} else {
self.active_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
self.inactive_constraints.write().unwrap().clear();
}
}
/// Compute costs for a primal solution.
pub fn get_primal_costs(&self, fidx: usize, primals: &[DVector]) -> Real {
let sub = self.subs[fidx].read().unwrap();
if self.multimodel {
primals[0].iter().enumerate().map(|(i, x)| x * sub.cbase[i]).sum()
} else {
let mut sum = 0.0;
for p in primals {
for (i, x) in p.iter().enumerate() {
sum += x * sub.cbase[i];
}
}
sum
}
}
/// Aggregate primal vectors.
pub fn aggregate_primals_ref(&self, primals: &[(Real, &Vec<DVector>)]) -> Vec<DVector> {
let mut aggr = primals[0]
.1
.iter()
.map(|x| {
let mut r = dvec![];
r.scal(primals[0].0, x);
r
})
.collect::<Vec<_>>();
for &(alpha, primal) in &primals[1..] {
for (j, x) in primal.iter().enumerate() {
aggr[j].add_scaled(alpha, x);
}
}
aggr
}
}
impl ParallelProblem for MMCFProblem {
type Err = Error;
type Primal = DVector;
fn num_variables(&self) -> usize {
self.active_constraints.read().unwrap().len()
}
fn lower_bounds(&self) -> Option<Vec<Real>> {
Some(vec![0.0; self.num_variables()])
}
fn num_subproblems(&self) -> usize {
self.subs.len()
}
fn start(&mut self) {
let pool = ThreadPool::new(4);
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<I>(
&mut self,
i: usize,
y: Arc<DVector>,
index: I,
tx: ResultSender<I, Self::Primal, Self::Err>,
) -> Result<()>
where
I: Send + Copy + 'static,
{
if self.pool.is_none() {
self.start()
}
let y = y.clone();
let sub = self.subs[i].clone();
// Attention: `y` might be shorter than the set of active constraints
// because evaluation and problem update are not synchronized, i.e. the
// evaluation may still use an older model with less constraints.
let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
self.pool
.as_ref()
.unwrap()
.execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
Ok((objective, subg, primal)) => {
tx.send(Ok(EvalResult::ObjectiveValue {
index,
value: objective,
}))
.unwrap();
tx.send(Ok(EvalResult::Minorant {
index,
minorant: Minorant {
constant: objective,
linear: subg,
},
primal,
}))
.unwrap();
}
Err(err) => tx.send(Err(err)).unwrap(),
});
Ok(())
}
fn update<I, U>(&mut self, state: U, index: I, tx: UpdateSender<I, Self::Primal, Self::Err>) -> Result<()>
where
U: ParallelUpdateState<Self::Primal>,
I: Send + 'static,
{
if self.inactive_constraints.read().unwrap().is_empty() {
return Ok(());
}
let inactive_constraints = self.inactive_constraints.clone();
let active_constraints = self.active_constraints.clone();
let nold = self.active_constraints.read().unwrap().len();
let subs = self.subs.clone();
self.pool.as_ref().unwrap().execute(move || {
let newconstraints = inactive_constraints
.read()
.unwrap()
.iter()
.map(|&cidx| {
subs.iter()
.enumerate()
.map(|(fidx, sub)| {
let primal = state.aggregated_primal(fidx);
sub.read().unwrap().evaluate_constraint(&primal, cidx)
})
.sum::<Real>()
})
.enumerate()
.filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
.collect::<Vec<_>>();
let mut inactive = inactive_constraints.write().unwrap();
let mut active = active_constraints.write().unwrap();
active.extend(newconstraints.into_iter().rev().map(|i| inactive.swap_remove(i)));
// *** The following code needs `drain_filter`, which is experimental as
// of rust 1.36 ***
// active
// .extend(inactive.drain_filter(|&mut cidx| {
// subs.iter()
// .enumerate()
// .map(|(fidx, sub)| {
// let primal = state.aggregated_primal(fidx);
// sub.read().unwrap().evaluate_constraint(&primal, cidx)
// })
// .sum::<Real>() < 1e-3
// }));
let nnew = active.len() - nold;
if nnew > 0 {
let actives = active.clone();
if let Err(err) = tx.send(Ok(ParallelUpdate::AddVariables {
index,
bounds: vec![(0.0, Real::infinity()); nnew],
sgext: Box::new(move |fidx: usize, primal: &DVector, inds: &[usize]| {
let sub = subs[fidx].read().unwrap();
Ok(inds
.iter()
.map(|&i| sub.evaluate_constraint(primal, actives[i]))
.collect())
}),
})) {
warn!("Error sending problem update: {}", err);
}
}
});
Ok(())
}
}