xml_tools_ext.rs
1 //! Rust extensions to the athenaCL.libATH.xmlTools 2 3 use quick_xml::{events::Event, reader::Reader}; 4 use rustpython_vm::{pymodule, VirtualMachine}; 5 6 pub(crate) fn make_module(vm: &mut VirtualMachine) { 7 vm.add_native_module("xmlToolsExt", Box::new(_inner::make_module)); 8 } 9 10 #[pymodule] 11 pub(super) mod _inner { 12 use std::str; 13 14 use super::*; 15 use ahash::AHashMap; 16 use rustpython_vm::{convert::ToPyObject, PyResult}; 17 18 #[pyfunction(name = "xmlToPy")] 19 pub(crate) fn xml_to_py(code: String, vm: &VirtualMachine) -> PyResult { 20 let mut reader = Reader::from_str(&code); 21 reader.trim_text(true); 22 let mut buf = Vec::new(); 23 let mut stack = Vec::new(); 24 let root = vm.ctx.new_dict(); 25 let mut attrs_cache = AHashMap::<String, String>::new(); 26 27 loop { 28 match reader.read_event_into(&mut buf) { 29 Err(e) => { 30 let err_msg = 31 format!("Error parsing XML at {}: {:?}", reader.buffer_position(), e); 32 let value_err = vm.new_value_error(err_msg); 33 return Err(value_err); 34 } 35 36 Ok(Event::Start(e)) => { 37 let name = match e 38 .attributes() 39 .find(|a| a.as_ref().expect("cannot get attribute").key.as_ref() == b"name") 40 { 41 Some(attr) => reader 42 .decoder() 43 .decode(attr.expect("cannot get attribute").value.as_ref()) 44 .expect("cannot decode xml attribute") 45 .to_string(), 46 None => reader 47 .decoder() 48 .decode(e.name().as_ref()) 49 .expect("cannot decode xml element name") 50 .to_string(), 51 }; 52 53 let dict = vm.ctx.new_dict(); 54 stack.push((name, dict)); 55 } 56 57 Ok(Event::Empty(e)) => { 58 if let Some((_, dict)) = stack.last() { 59 for a in e.attributes() { 60 let attr = a.expect("cannot get attribute"); 61 let key = reader 62 .decoder() 63 .decode(attr.key.as_ref()) 64 .expect("cannot decode xml attribute"); 65 let value = reader 66 .decoder() 67 .decode(attr.value.as_ref()) 68 .expect("cannot decode xml attribute"); 69 70 attrs_cache.insert(key.to_string(), value.to_string()); 71 } 72 73 let value = vm 74 .ctx 75 .new_str(attrs_cache.remove("value").unwrap_or_default().as_str()); 76 77 dict.set_item( 78 &attrs_cache.remove("key").unwrap_or_default(), 79 value.to_pyobject(vm), 80 vm, 81 )?; 82 } 83 } 84 85 Ok(Event::End(_)) => { 86 if let Some((name, dict)) = stack.pop() { 87 let parent = stack.last().map(|(_, p)| p).unwrap_or(&root); 88 parent.set_item(&name, dict.to_pyobject(vm), vm)?; 89 } 90 } 91 92 Ok(Event::Eof) => break, 93 94 _ => {} 95 } 96 97 buf.clear(); 98 } 99 100 Ok(root.into()) 101 } 102 103 #[pyfunction(name = "checkFileFormat")] 104 pub(crate) fn check_file_format(content: String, vm: &VirtualMachine) -> PyResult { 105 let mut reader = Reader::from_str(&content); 106 reader.trim_text(true); 107 let mut buf = Vec::new(); 108 109 let mut res = vec!["xml".to_string(), "ok".to_string()]; 110 111 loop { 112 match reader.read_event_into(&mut buf) { 113 Ok(Event::Start(e)) => { 114 if e.name().as_ref() == b"athenaObject" { 115 break; 116 } 117 } 118 119 Ok(Event::Eof) => { 120 res[0] = "unknown".to_string(); 121 res[1] = "error reading the file".to_string(); 122 break; 123 } 124 125 Err(e) => { 126 res[0] = "unknown".to_string(); 127 res[1] = format!("Error parsing XML at {}: {:?}", reader.buffer_position(), e); 128 } 129 130 _ => {} 131 } 132 133 buf.clear(); 134 } 135 136 Ok(vm 137 .ctx 138 .new_tuple( 139 res.into_iter() 140 .map(|v| vm.ctx.new_str(v).to_pyobject(vm)) 141 .collect(), 142 ) 143 .into()) 144 } 145 }