use anyhow::{bail, ensure, Context, Result};
use axum::{
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
};
use bytes::Bytes;
use hex::FromHex;
use log::{debug, error, info, LevelFilter};
use rand::distributions::{Alphanumeric, DistString};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use sailfish::TemplateOnce;
use serde::Deserialize;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use std::{collections::HashMap, time::Instant};
use structopt::StructOpt;
use systemd_journal_logger::JournalLog;
use tokio::fs;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::time::{self, sleep, Duration};
use libreloc_shared::*;
use geohash::Coord;
#[derive(Clone)]
struct AppState {
conf: Conf,
location_state: Arc<Mutex<HashMap<SrcKind, Source>>>,
location: Arc<Mutex<Plocation>>,
}
#[derive(Debug, Clone)]
struct Source {
name: String,
last_plocation: Plocation,
last_update_time: Instant,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
enum SrcKind {
Gnss,
GsmCountry,
WifiBt,
}
#[derive(Debug)]
struct Update {
plocation: Plocation,
extra: Option<String>,
src_kind: SrcKind,
last_update_time: Instant,
}
type Sender = tokio::sync::mpsc::Sender<Update>;
type Receiver = tokio::sync::mpsc::Receiver<Update>;
#[derive(Deserialize, Debug, Clone)]
struct Conf {
conf_version: u8,
upload_url: Option<String>,
fetch_url: Option<String>,
listen_addr: Option<String>,
}
fn load_conf() -> Result<Conf> {
let conf_fn =
std::env::var("CONF_FN").unwrap_or_else(|_| "/etc/libreloc_client.json".to_string());
info!("[main] reading {}", conf_fn);
let jc = std::fs::read_to_string(&conf_fn)
.with_context(|| format!("Unable to read config file '{}'", &conf_fn))?;
let conf: Conf = serde_json::from_str(&jc)
.with_context(|| format!("Unable to parse configuration in '{}'", &conf_fn))?;
debug!("[main] conf loaded");
ensure!(
conf.conf_version == 1,
"Invalid configuration format version. Must be 1"
);
Ok(conf)
}
#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct Tpv {
class: String,
device: Option<String>,
mode: Option<u8>,
time: Option<String>,
ept: Option<f64>,
lat: Option<f64>,
lon: Option<f64>,
alt: Option<f64>,
epx: Option<f64>,
epy: Option<f64>,
epv: Option<f64>,
}
#[derive(Deserialize, Debug)]
struct GpsdResponse {
tpv: Vec<Tpv>,
}
async fn gnss_reader(chan: Sender) -> Result<()> {
const URL: &str = "http://localhost:2947/stream?device=json";
let reqc = reqwest::Client::new();
loop {
let resp: GpsdResponse = reqc.get(URL).send().await?.json().await?;
if let Some(tpv) = resp.tpv.first() {
if let (Some(lat), Some(lon), Some(accuracy)) = (tpv.lat, tpv.lon, tpv.epx) {
let up = Update {
plocation: Plocation {
loc: Coord { x: lon, y: lat },
lsm: accuracy as f32,
},
extra: None,
src_kind: SrcKind::Gnss,
last_update_time: Instant::now(),
};
chan.send(up).await.unwrap();
} else {
info!("[gnss] Location data is not available yet.");
}
}
sleep(Duration::from_secs(5)).await;
}
}
async fn fetch_public_ipaddr(reqc: &reqwest::Client) -> Result<String> {
const IPADDR_LOOKUP_URL: &str = "https://ifconfig.co/ip";
let resp = reqc
.get(IPADDR_LOOKUP_URL)
.header(reqwest::header::USER_AGENT, "libreloc-client")
.send()
.await
.context("Failed to send request")?;
if !resp.status().is_success() {
return Err(anyhow::anyhow!("Request failed"));
}
let ipaddr = resp.text().await?;
let ipaddr = ipaddr.trim();
let _: IpAddr = ipaddr.parse()?; Ok(ipaddr.to_string())
}
#[derive(Deserialize, Debug)]
struct RipeIpmapBestRespLoc {
latitude: f64,
longitude: f64,
}
#[derive(Deserialize, Debug)]
struct RipeIpmapBestResp {
location: Option<RipeIpmapBestRespLoc>,
}
async fn geoip_prober_one(reqc: &reqwest::Client) -> Result<Coord> {
let ipaddr = fetch_public_ipaddr(reqc).await?;
let url = format!(
"https://ipmap-api.ripe.net/v1/locate/{}/best?client=libreloc-client",
ipaddr
);
debug!("[geoip] fetching {}", url);
let resp = reqc
.get(url)
.header(reqwest::header::USER_AGENT, "libreloc-client")
.send()
.await
.context("Failed to send request")?;
if !resp.status().is_success() {
return Err(anyhow::anyhow!("Request failed"));
}
debug!("[geoip] decoding json");
let resp: RipeIpmapBestResp = resp.json().await?;
debug!("[geoip] received {:?}", resp);
let r = match resp.location {
Some(v) => v,
None => return Err(anyhow::anyhow!("Request failed")),
};
Ok(Coord {
y: r.latitude,
x: r.longitude,
})
}
async fn geoip_prober(chan: Sender) {
let reqc = reqwest::Client::new();
loop {
let c = match geoip_prober_one(&reqc).await {
Ok(c) => c,
Err(e) => {
error!("[geoip] {:?}", e);
time::sleep(Duration::from_secs(3600 * 4)).await;
continue;
}
};
let up = Update {
plocation: Plocation {
loc: c,
lsm: 500_000.0, },
extra: None,
src_kind: SrcKind::Gnss,
last_update_time: Instant::now(),
};
chan.send(up).await.unwrap();
time::sleep(Duration::from_secs(3600)).await;
}
}
async fn fetch_gsm_mcc() -> Result<Option<String>> {
let p = Command::new("nmcli")
.args(["-t", "-f", "GSM.OPERATOR.ID", "device", "status"])
.output()
.await
.context("Failed to run nmcli")?;
if !p.status.success() {
let stderr = String::from_utf8_lossy(&p.stderr);
if stderr.contains("GSM.OPERATOR.ID") {
return Ok(None);
}
bail!("nmcli command failed with: {}", stderr);
}
let stdout = String::from_utf8_lossy(&p.stdout);
if stdout.trim().is_empty() {
Ok(None)
} else {
let op_id = stdout.trim();
let mcc = &op_id[0..3]; Ok(Some(mcc.into()))
}
}
async fn gsm_country_code_reader(chan: tokio::sync::mpsc::Sender<Update>) {
loop {
info!("[gsm] gsm MCC");
let mcc = match fetch_gsm_mcc().await {
Ok(Some(mcc)) => mcc,
Ok(None) | Err(_) => {
debug!("[gsm] Unable to fetch GSM MCC");
time::sleep(Duration::from_secs(3600 * 24)).await;
continue;
}
};
info!("[gsm] MCC: {}", mcc);
let up = Update {
plocation: NOWHERE,
extra: None,
src_kind: SrcKind::Gnss,
last_update_time: Instant::now(),
};
chan.send(up).await.unwrap();
time::sleep(Duration::from_secs(60 * 30)).await;
}
}
async fn scan_wifi() -> Result<Blips> {
debug!("[wifi_bt] running nmcli");
let output = tokio::process::Command::new("nmcli")
.args([
"--fields",
"SSID-HEX,BSSID,CHAN,SIGNAL,SECURITY",
"--colors",
"no",
"--escape",
"no",
"device",
"wifi",
"list",
])
.output()
.await
.context("Failed to execute nmcli")?;
if !output.status.success() {
info!("[wifi_bt] nmcli failed");
anyhow::bail!("nmcli command failed");
}
let stdout = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout.lines().collect();
let mut blips: Blips = vec![];
for line in lines {
let columns: Vec<&str> = line.split_whitespace().collect();
if columns.len() >= 5 {
let hex_ssid = columns[0];
if hex_ssid == "SSID-HEX" {
continue;
}
let macaddr = columns[1];
let channel = columns[2];
let signal = columns[3];
let security = columns[4];
let bytes = match Vec::from_hex(hex_ssid) {
Ok(v) => v,
Err(_) => continue,
};
debug!("[wifi_bt] converting {macaddr} {signal} {security} {hex_ssid}");
let ssid = String::from_utf8_lossy(&bytes).into_owned();
let macaddr = match parse_macaddr(macaddr) {
Ok(v) => v,
Err(_) => continue,
};
blips.push((Coord { x: 0., y: 0. }, macaddr, ssid));
}
}
Ok(blips)
}
async fn wifi_bt_location_fetcher(chan: tokio::sync::mpsc::Sender<Update>, url: String) {
loop {
info!("[wifi_bt] scan and locate cycle started");
if let Ok(blips) = scan_wifi().await {
info!("[wifi_bt] {} nearby devices detected", blips.len());
if let Ok(plocation) = geolocate_blips(&url, blips).await {
let up = Update {
plocation,
extra: None,
src_kind: SrcKind::WifiBt,
last_update_time: Instant::now(),
};
chan.send(up).await.unwrap();
} else {
info!("[wifi_bt] failed to geolocate WiFi emitters");
}
} else {
info!("[wifi_bt] failed to scan for WiFi emitters");
};
time::sleep(Duration::from_secs(2)).await;
}
}
fn calc_accuracy_from_last_plocation(pl: &Plocation, past: SystemTime) -> Plocation {
const KPH: f32 = 200.;
if let Ok(delta) = SystemTime::now().duration_since(past) {
let hours = delta.as_secs() as f32 / 3600.;
let lsm = pl.lsm * hours * KPH;
Plocation { loc: pl.loc, lsm }
} else {
NOWHERE
}
}
fn spawn_workers(conf: Conf, tx: Sender) {
{
let tx = tx.clone();
tokio::spawn(gnss_reader(tx));
}
{
let tx = tx.clone();
tokio::spawn(geoip_prober(tx));
}
{
let tx = tx.clone();
tokio::spawn(gsm_country_code_reader(tx));
}
{
let tx = tx.clone();
let url = conf.fetch_url.clone().unwrap().clone();
tokio::spawn(wifi_bt_location_fetcher(tx, url));
}
}
#[derive(structopt::StructOpt, Debug)]
#[structopt(name = "libreloc_client")]
enum Cli {
UploadCSV {
#[structopt(parse(from_os_str))]
filename: std::path::PathBuf,
},
Locate,
LocateCSV {
#[structopt(parse(from_os_str))]
filename: std::path::PathBuf,
},
Run,
UploadRandomDataBench,
}
async fn receive_updates_until_exit(
mut rx: Receiver,
location_state: Arc<Mutex<HashMap<SrcKind, Source>>>,
global_loc: Arc<Mutex<Plocation>>,
) -> Result<()> {
while let Some(update) = rx.recv().await {
let mut location_state = location_state.lock().unwrap();
let s = location_state.get_mut(&update.src_kind).unwrap();
s.last_plocation = update.plocation;
s.last_update_time = update.last_update_time;
info!("[main] fusing {:?}", &s);
let locs = location_state.values().map(|s| s.last_plocation).collect();
let mut g = global_loc.lock().unwrap();
let loc = fuse_location_sources(locs);
g.loc = loc.loc;
g.lsm = loc.lsm;
info!("[main] Location: {:?}", g);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let cli_cmd: Cli = Cli::from_args();
if let Cli::Run = cli_cmd {
JournalLog::new()?.install()?;
} else {
simple_logger::SimpleLogger::new()
.with_module_level("rewqest", LevelFilter::Warn)
.init()?;
}
log::set_max_level(LevelFilter::Debug);
info!("[main] starting");
let conf = load_conf().unwrap_or_else(|err| {
error!("{}", err);
std::process::exit(1)
});
let args = Cli::from_args();
match args {
Cli::UploadCSV { filename } => upload_csv(&conf, &filename).await?,
Cli::LocateCSV { filename } => locate_from_csv(&conf, &filename).await?,
Cli::Locate => locate_from_cli(&conf).await?,
Cli::Run => {}
Cli::UploadRandomDataBench => upload_random_data_bench(&conf).await?,
}
if let Cli::Run = cli_cmd {
} else {
return Ok(());
}
let app_state = AppState {
conf: conf.clone(),
location_state: Arc::new(Mutex::new(HashMap::new())),
location: Arc::new(Mutex::new(NOWHERE)),
};
{
let mut location_state = app_state.location_state.lock().unwrap();
let vals = &[SrcKind::Gnss, SrcKind::GsmCountry, SrcKind::WifiBt];
for v in vals {
location_state.insert(
v.clone(),
Source {
name: format!("{:?}", v.clone()),
last_plocation: NOWHERE,
last_update_time: Instant::now(),
},
);
}
}
let location_state = app_state.location_state.clone();
let gloc = app_state.location.clone();
tokio::spawn(run_location_service_api(conf.clone(), app_state));
let (tx, rx) = mpsc::channel(10);
spawn_workers(conf.clone(), tx);
receive_updates_until_exit(rx, location_state, gloc).await?;
Ok(())
}
pub async fn cached_remote_lookup(
reqc: &reqwest::Client,
baseurl: &str,
geopath: &str,
blips: &Blips,
stats: &mut CacheStats,
) -> Result<(f32, String)> {
let cache = Cache::new().await?;
let mut blooms: Vec<BloomHash> = Vec::new();
for (_, mac, ssid) in blips {
let fingerprint = generate_fingerprint_wifi(geopath, mac, ssid);
let url = format!("{}/{}", baseurl, fingerprint);
stats.tot_cnt += 1;
let bloom: BloomHash = {
if let Some(blob) = cache.read_data(&fingerprint).await {
ensure!(blob.len() == 128);
let array: [u8; 128] = blob.try_into().unwrap();
stats.hit_cnt += 1;
BloomHash::new(array)
} else {
let resp = reqc.get(url).send().await?;
ensure!(resp.status().is_success());
let bytes = resp.bytes().await?;
ensure!(bytes.len() == 128);
cache.store_data(&fingerprint, &bytes).await?;
let array: [u8; 128] = bytes.as_ref().try_into()?;
BloomHash::new(array)
}
};
blooms.push(bloom);
}
let (selectiveness, b) = pick_best_segment(pile_up_blooms(blooms));
Ok((selectiveness, geosegment_to_string(b)))
}
pub struct CacheStats {
hit_cnt: i32,
tot_cnt: i32,
}
async fn locate_from_csv(conf: &Conf, fname: &PathBuf) -> Result<()> {
ensure!(conf.fetch_url.is_some(), "Data fetch URL not configured");
let file = std::fs::File::open(fname)?;
let mut rdr = csv::ReaderBuilder::new().from_reader(file);
let mut blips: Blips = vec![];
for result in rdr.deserialize() {
let r: NeoStumblerCsvRow = result?;
blips.push((
Coord {
x: r.longitude.parse()?,
y: r.latitude.parse()?,
},
parse_macaddr(&r.macAddress)?,
r.ssid,
));
}
let url = conf.fetch_url.clone().unwrap();
geolocate_blips(&url, blips).await?;
Ok(())
}
async fn geolocate_blips(url: &str, blips: Vec<(Coord, [u8; 6], String)>) -> Result<Plocation> {
let mut stats = CacheStats {
hit_cnt: 0,
tot_cnt: 0,
};
let reqc = reqwest::Client::new();
let mut geopath = "".to_string();
for step in 0..4 {
debug!("[lookup] step n. {step}");
let (sel, segment) =
cached_remote_lookup(&reqc, &url, &geopath, &blips, &mut stats).await?;
geopath.push_str(&segment);
info!(
"[lookup] [{} {}] https://geohash.jorren.nl/#{}",
step, sel, geopath
);
debug!(
"[lookup] Cacheing efficiency: {}%",
stats.hit_cnt * 100 / stats.tot_cnt
)
}
geohash_to_plocation(&geopath)
}
async fn locate_from_cli(conf: &Conf) -> Result<()> {
todo!();
Ok(())
}
#[allow(non_snake_case)]
#[derive(Debug, serde::Deserialize, Eq, PartialEq)]
struct NeoStumblerCsvRow {
timestamp: String,
latitude: String,
longitude: String,
locationAccuracy: String,
locationAge: String,
speed: String,
macAddress: String,
wifiScanAge: String,
signalStrength: String,
ssid: String,
}
fn load_csv(fname: &PathBuf) -> Result<BloomHashStore> {
let file = std::fs::File::open(fname)?;
let mut rdr = csv::ReaderBuilder::new().from_reader(file);
let mut bhs = BloomHashStore::new();
for result in rdr.deserialize() {
let r: NeoStumblerCsvRow = result?;
bhs.insert_datapoint(
r.latitude.parse()?,
r.longitude.parse()?,
&parse_macaddr(&r.macAddress)?,
&r.ssid,
);
}
debug!("Stats {:?}", bhs.stats());
Ok(bhs)
}
async fn upload_csv(conf: &Conf, fname: &PathBuf) -> Result<()> {
ensure!(conf.upload_url.is_some(), "Upload URL not configured");
let bhs = load_csv(fname)?;
let blob = bhs.pack_then_compress()?;
info!("[upload-csv] Compressed blob size {:?}", blob.len());
let reqc = reqwest::Client::new();
let resp = reqc
.post(conf.upload_url.as_ref().unwrap())
.body(blob)
.send()
.await?;
resp.error_for_status()?;
info!("[upload-csv] done");
Ok(())
}
async fn upload_random_data_bench(conf: &Conf) -> Result<()> {
ensure!(conf.upload_url.is_some(), "Upload URL not configured");
let mut rng: StdRng = SeedableRng::seed_from_u64(3);
for _ in 0..1000 {
let mut bhs = BloomHashStore::new();
for _ in 0..10_000 {
let lat = rng.gen_range(-90.0..=90.0);
let lon = rng.gen_range(-180.0..=180.0);
let mac: Mac = rng.gen();
let ssid = Alphanumeric.sample_string(&mut rng, 16);
bhs.insert_datapoint(lat, lon, &mac, &ssid)
}
let blob = bhs.pack_then_compress()?;
info!("[upload-bench] Compressed blob size {:?}", blob.len());
let reqc = reqwest::Client::new();
let resp = reqc
.post(conf.upload_url.as_ref().unwrap())
.body(blob)
.send()
.await?;
resp.error_for_status()?;
}
info!("[upload-bench] done");
Ok(())
}
struct Cache {
cachedir: PathBuf,
}
impl Cache {
async fn new() -> Result<Self> {
let xdg_dirs = xdg::BaseDirectories::with_prefix("libreloc_client")?;
let cachedir = xdg_dirs.create_cache_directory("cache")?;
Ok(Cache { cachedir })
}
async fn prune_old_files(&self, max_age: Duration) -> Result<()> {
let now = SystemTime::now();
let mut fnames = fs::read_dir(&self.cachedir).await?;
loop {
let file = fnames.next_entry().await?;
if let Some(f) = file {
let mtime = f.metadata().await?.modified()?;
if now.duration_since(mtime).unwrap_or_default() > max_age {
debug!("[cache] deleting {}", f.path().to_string_lossy());
fs::remove_file(f.path()).await?;
}
} else {
return Ok(());
}
}
}
fn file_path(&self, key: &str) -> PathBuf {
self.cachedir.join(format!("cache_{}", key))
}
async fn store_data(&self, key: &str, data: &Bytes) -> Result<()> {
let fname = self.file_path(key);
let mut file = fs::File::create(&fname)
.await
.with_context(|| format!("Failed to create {:?}", fname))?;
file.write_all(data)
.await
.with_context(|| format!("Failed to write to {:?}", fname))?;
Ok(())
}
async fn read_data(&self, key: &str) -> Option<Vec<u8>> {
let fname = self.file_path(key);
let file = fs::File::open(&fname).await;
if let Ok(mut file) = file {
let mut data = Vec::new();
file.read_to_end(&mut data).await.unwrap();
Some(data)
} else {
None
}
}
}
async fn upload_geosubmit(conf: &Conf, msg: MLSGeoSubmit) -> Result<()> {
ensure!(conf.upload_url.is_some(), "Upload URL not configured");
let mut bhs = BloomHashStore::new();
for i in msg.items {
info!("[api-sub] {:?} {:?}", i.timestamp, i.position);
let Some(p) = i.position else { continue };
for aps in i.wifiAccessPoints {
for ap in aps {
bhs.insert_datapoint(
p.latitude,
p.longitude,
&parse_macaddr(&ap.macAddress)?,
&ap.ssid.unwrap_or("".to_owned()),
);
}
}
}
debug!("[api-sub] {:?}", bhs.stats());
let blob = bhs.pack_then_compress()?;
info!("[api-sub] Compressed blob size {:?}", blob.len());
let reqc = reqwest::Client::new();
let resp = reqc
.post(conf.upload_url.as_ref().unwrap())
.body(blob)
.send()
.await?;
resp.error_for_status()?;
Ok(())
}
async fn post_mls_geosubmit(
State(app_state): State<AppState>,
axum::extract::Json(msg): axum::extract::Json<MLSGeoSubmit>,
) -> impl IntoResponse {
info!("[api-sub] geosubmit received");
if let Err(e) = upload_geosubmit(&app_state.conf, msg).await {
info!("[api-sub] error {}", e);
StatusCode::SERVICE_UNAVAILABLE
} else {
info!("[api-sub] done");
StatusCode::OK
}
}
async fn get_stats() -> impl IntoResponse {
debug!("GET stats");
let j = serde_json::json!({
"component": "client",
});
(StatusCode::OK, axum::Json(j))
}
struct DashRow {
status: String,
name: String,
}
#[derive(TemplateOnce)]
#[template(path = "dashboard.stpl")]
struct DashTemplate<'a> {
dash_tbl: Vec<&'a Source>,
tstamp: &'a str,
js_blob: &'a str,
}
async fn serve_http_get_dashboard(
State(app_state): State<AppState>,
) -> axum::response::Html<String> {
let js_blob = include_str!("../templates/dashboard.js");
debug!("GET dashboard");
let location_state = app_state.location_state.lock().unwrap();
let dash_tbl = location_state.values().collect();
let tstamp = "".into();
let tpl = DashTemplate {
dash_tbl,
tstamp,
js_blob,
};
axum::response::Html(tpl.render_once().unwrap())
}
async fn serve_http_get_location(State(app_state): State<AppState>) -> impl IntoResponse {
debug!("GET /v2/location");
let loc = app_state.location.lock().unwrap();
let j = serde_json::json!({
"lat": loc.loc.y,
"lon": loc.loc.x,
"r": loc.lsm * 100.,
});
(StatusCode::OK, axum::Json(j))
}
async fn run_location_service_api(conf: Conf, app_state: AppState) -> Result<()> {
let addr: std::net::SocketAddr = conf
.listen_addr
.clone()
.unwrap_or("127.0.0.1:3939".to_owned())
.parse()?;
let app: Router = Router::new()
.route("/", get(get_stats))
.route("/v2/ui", get(serve_http_get_dashboard))
.route("/v2/location", get(serve_http_get_location))
.route("/v2/geosubmit", post(post_mls_geosubmit))
.layer(
tower::ServiceBuilder::new()
.layer(tower_http::decompression::RequestDecompressionLayer::new()),
)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
info!("[api] starting at addr {:?}", addr);
axum::serve(listener, app).await.unwrap();
Ok(())
}