use jni::{JNIEnv, JavaVM, InitArgsBuilder, JNIVersion, NativeMethod};
use jni::objects::{JObject, JValue, JString};
use std::convert::TryInto;
use std::ffi::c_void;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, atomic::{AtomicI32,Ordering}};
use crate::error::Error;
use crate::value_object::ValueObject;
#[derive(Clone)]
pub struct TransSecsWrapper {
uid:i32,
jvm:Arc<Mutex<JavaVM>>,
listener_map:Arc<Mutex<HashMap<u128, Box<dyn FnMut(String, ValueObject)+Send>>>>
}
type ListenerUUID = u128;
const QUALITY_OUT_OF_SERVICE:i32 = 2003;
lazy_static! {
static ref UID_WRAPPER_MAP:Arc<Mutex<HashMap<i32, TransSecsWrapper>>> = Arc::new(Mutex::new(HashMap::new()));
}
static JVM_COUNTER:AtomicI32 = AtomicI32::new(0);
impl TransSecsWrapper {
pub fn new<'a, S>(classpath:S) -> Result<TransSecsWrapper,Error> where S:Into<String> {
let jvm_args = InitArgsBuilder::new()
.version(JNIVersion::V8)
.option(format!("-Djava.class.path={}", classpath.into()).as_str())
.option(format!("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000").as_str())
.build().unwrap();
let jvm = JavaVM::new(jvm_args);
if jvm.is_err() {
return Err(Error::JVMCreationFailed);
}
let uid = JVM_COUNTER.fetch_add(1, Ordering::Relaxed);
let jvm = jvm.unwrap();
let wrapper = TransSecsWrapper{uid,jvm:Arc::new(Mutex::new(jvm)), listener_map:Arc::new(Mutex::new(HashMap::new()))};
wrapper.register_native_methods().unwrap();
UID_WRAPPER_MAP.lock().unwrap().insert(uid, wrapper.clone());
return Ok(wrapper);
}
fn register_native_methods(&self) -> Result<(), Error>{
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let listener_class = env.find_class("com/ergotech/vib/servers/nativewrapper/NativeTopicListener");
if listener_class.is_err() {
return Err(Error::InternalError(format!("Failed to find listener class: {}", listener_class.unwrap_err())));
}
let listener_class = listener_class.unwrap();
env.call_static_method(listener_class, "setWrapperUID", "(I)V", &[JValue::from(self.uid)]).unwrap();
let raw = TransSecsWrapper::topic_value_changed as *mut() as *mut c_void;
let native_method = NativeMethod{name:"topicValueChanged".into(), sig:"(Ljava/lang/String;Lcom/ergotech/vib/valueobjects/ValueObjectInterface;)V".into(), fn_ptr:raw};
let res = env.register_native_methods(listener_class, &[native_method]);
if res.is_err() {
return Err(Error::InternalError(format!("Failed to register topicValueChanged: {}", res.unwrap_err())));
} else {
return Ok(());
}
}
fn topic_value_changed(env:JNIEnv, listener_object:JObject, topic_string:JString, value_jobject:JObject) {
let listener_class = env.get_object_class(listener_object).unwrap();
let res = env.call_static_method(listener_class, "getWrapperUID", "()I", &[]);
if res.is_err() {
eprintln!("Failed to get wrapper UID: {}", res.unwrap_err());
return;
}
let uid = res.unwrap().i().unwrap();
let res = env.call_method(listener_object, "getListenerUUID", "()[B", &[]);
if res.is_err() {
eprintln!("Failed to get listener UUID: {}", res.unwrap_err());
return;
}
let res = res.unwrap().l().unwrap();
let uuid_bytes = env.convert_byte_array(*res).unwrap();
let uuid_buf:Result<[u8;16], _> = uuid_bytes[0..16].try_into();
if uuid_buf.is_err() {
eprintln!("Bad uuid buffer: {}", uuid_buf.unwrap_err());
return;
}
let uuid_buf = uuid_buf.unwrap();
let uuid:u128 = u128::from_be_bytes(uuid_buf);
let topic_string = env.get_string(JString::from(topic_string));
if topic_string.is_err() {
eprintln!("Error getting string from Java string");
return;
}
let topic_string = topic_string.unwrap();
let test_value = topic_string.to_str();
let topic_string_value;
if test_value.is_err() {
eprintln!("Warning: string value contained invalid utf8 characters, returning a lossy string");
topic_string_value = topic_string.to_string_lossy().to_string();
} else {
topic_string_value = test_value.unwrap().to_string();
}
let value_object = ValueObject::from_jobject(&env, value_jobject);
if value_object.is_err() {
eprintln!("Error getting ValueObject from jobject: {:?}", value_object.unwrap_err());
return;
}
let value_object = value_object.unwrap();
let wrapper_map = UID_WRAPPER_MAP.lock().unwrap();
let wrapper = &wrapper_map[&uid];
let mut listener_map = wrapper.listener_map.lock().unwrap();
let callback = listener_map.get_mut(&uuid).unwrap();
callback(topic_string_value, value_object);
}
pub fn subscribe<'a, S, F>(&self, topic:S, callback:F) -> Result<ListenerUUID, Error> where S:Into<String>, F:FnMut(String, ValueObject) + Send + 'static {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let listener_class = env.find_class("com/ergotech/vib/servers/nativewrapper/NativeTopicListener");
if listener_class.is_err() {
return Err(Error::InternalError(format!("Failed to find listener class: {}", listener_class.unwrap_err())));
}
let listener_class = listener_class.unwrap();
let listener_object = env.new_object(listener_class, "()V", &[]).unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
env.call_method(broker, "subscribe", "(Ljava/lang/String;Lcom/ergotech/vib/servers/nativewrapper/TopicListener;)V", &[JValue::Object(*topic_jstring), JValue::Object(listener_object)]).unwrap();
let res = env.call_method(listener_object, "getListenerUUID", "()[B", &[]);
if res.is_err() {
return Err(Error::InternalError(format!("Failed to get listener uuid: {}", res.unwrap_err())));
}
let res = res.unwrap().l().unwrap();
let uuid_bytes = env.convert_byte_array(*res).unwrap();
let uuid_buf:Result<[u8;16], _> = uuid_bytes[0..16].try_into();
if uuid_buf.is_err() {
return Err(Error::InternalError(format!("Bad uuid buffer: {}", uuid_buf.unwrap_err())));
}
let uuid_buf = uuid_buf.unwrap();
let uuid:u128 = u128::from_be_bytes(uuid_buf);
let mut listener_map = self.listener_map.lock().unwrap();
listener_map.insert(uuid, Box::new(callback));
return Ok(uuid)
}
pub fn unsubscribe(&self, listener_uuid:ListenerUUID) -> Result<(), Error> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let bytes = u128::to_be_bytes(listener_uuid);
let java_bytes = env.byte_array_from_slice(&bytes).unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let res = env.call_method(broker, "removeListener", "(Ljava/lang/String;[B)V", &[JValue::from(java_bytes)]);
if res.is_err() {
return Err(Error::InternalError(format!("Failed to remove listener: {}", res.unwrap_err())));
} else {
return Ok(());
}
}
pub fn start_main<S>(&self, package_name:S) -> Result<(), Error> where S:Into<String> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let main_qualified_name = format!("deploy/{}/EquipmentController", package_name.into());
let controller_class = env.find_class(&main_qualified_name);
if controller_class.is_err() {
return Err(Error::BadControllerClass(main_qualified_name));
}
let controller_class = controller_class.unwrap();
let res = env.call_static_method(controller_class, "startMain", "()V", &[]);
if res.is_err() {
return Err(Error::InternalError(format!("Failed to start controller class: {}", res.unwrap_err())));
} else {
return Ok(());
}
}
pub fn add_test_values_to_broker(&self) -> Result<(), Error> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker_class = env.find_class("com/ergotech/vib/servers/nativewrapper/TopicBroker").unwrap();
let res = env.call_static_method(broker_class, "addTestData", "()V", &[]);
if res.is_err() {
eprintln!("{}", res.unwrap_err());
}
return Ok(());
}
pub fn get<S>(&self, topic:S) -> Result<ValueObject, Error> where S:Into<String> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let res = env.call_method(broker, "get", "(Ljava/lang/String;)Lcom/ergotech/vib/valueobjects/ValueObjectInterface;", &[JValue::Object(*topic_jstring)]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling get on broker: {}", res.unwrap_err())))
}
let value_jobject = res.unwrap().l().unwrap();
let quality = env.call_method(value_jobject, "getQuality", "()I", &[]);
if quality.is_err() {
return Err(Error::InternalError(format!("Error calling getTimeMillis on value object: {}", quality.unwrap_err())))
}
let quality = quality.unwrap().i().unwrap();
if quality == QUALITY_OUT_OF_SERVICE {
return Err(Error::NoSuchTopic);
}
return ValueObject::from_jobject(&env, value_jobject);
}
pub fn publish_int<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<i64> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let res = env.call_method(broker, "publish", "(Ljava/lang/String;J)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn publish_bool<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<bool> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let res = env.call_method(broker, "publish", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn publish_string<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<String> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let value_jstring = env.new_string(value.into()).unwrap();
let res = env.call_method(broker, "publish", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value_jstring)]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn publish_json<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<String> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let value_jstring = env.new_string(value.into()).unwrap();
let res = env.call_method(broker, "publishObject", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value_jstring)]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn publish_bytes<S>(&self, topic:S, value:&[u8]) -> Result<(), Error> where S:Into<String>{
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let byte_array = env.byte_array_from_slice(value);
if byte_array.is_err() {
return Err(Error::InternalError(format!("Error getting byte array from slice: {}", byte_array.unwrap_err())))
}
let byte_array = byte_array.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let res = env.call_method(broker, "publish", "(Ljava/lang/String;[B])V", &[JValue::Object(*topic_jstring), JValue::from(byte_array)]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn publish_float<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<f64> {
let jvm = self.jvm.lock().unwrap();
let env = jvm.attach_current_thread();
if env.is_err() {
return Err(Error::JNIEnvError);
}
let env = env.unwrap();
let broker = TransSecsWrapper::get_broker_instance(&env);
let topic_jstring = env.new_string(topic.into()).unwrap();
let res = env.call_method(broker, "publish", "(Ljava/lang/String;D)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);
if res.is_err() {
return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
}
return Ok(());
}
pub fn alarm<S>(&self, alarm_name:S, set:bool) -> Result<(), Error> where S:Into<String> {
return self.publish_bool(format!("gemtool/ceids/{}/trigger", alarm_name.into().to_lowercase()), set)
}
pub fn trigger<S>(&self, event_name:S, ceid_trigger:S) -> Result<(), Error> where S:Into<String> {
return self.publish_string(format!("gemtool/ceids/{}/trigger", event_name.into().to_lowercase()), ceid_trigger)
}
fn get_broker_instance<'a>(env:&'a JNIEnv) -> JObject<'a> {
let broker_class = env.find_class("com/ergotech/vib/servers/nativewrapper/TopicBroker").unwrap();
let res = env.call_static_method(broker_class, "getInstance", "()Lcom/ergotech/vib/servers/nativewrapper/TopicBroker;", &[]);
return res.unwrap().l().unwrap();
}
}