1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
use jni::{JNIEnv, JavaVM, InitArgsBuilder, JNIVersion, NativeMethod};
use jni::objects::{JObject, JValue, JString};
use std::convert::TryInto;
use std::ffi::c_void;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, atomic::{AtomicI32,Ordering}};
use crate::error::Error;
use crate::value_object::ValueObject;

/// Wrapper around all TransSecs functions that also contains and manages the JVM
#[derive(Clone)]
pub struct TransSecsWrapper {
    // signed integer because it wants to be passed easily to java
    uid:i32,
    jvm:Arc<Mutex<JavaVM>>,
    listener_map:Arc<Mutex<HashMap<u128, Box<dyn FnMut(String, ValueObject)+Send>>>>
}

type ListenerUUID = u128;

const QUALITY_OUT_OF_SERVICE:i32 = 2003;
lazy_static! {
    static ref UID_WRAPPER_MAP:Arc<Mutex<HashMap<i32, TransSecsWrapper>>> = Arc::new(Mutex::new(HashMap::new()));
}

static JVM_COUNTER:AtomicI32 = AtomicI32::new(0);

impl TransSecsWrapper {
    /// Create a new TransSecsWrapper, using a classpath which must include the runtime jar
    pub fn new<'a, S>(classpath:S) -> Result<TransSecsWrapper,Error> where S:Into<String> {
        let jvm_args = InitArgsBuilder::new()
            .version(JNIVersion::V8)
            .option(format!("-Djava.class.path={}", classpath.into()).as_str())
            .option(format!("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000").as_str())
            // .option(format!("-verbose:class").as_str())
            .build().unwrap();
        let jvm = JavaVM::new(jvm_args);
        if jvm.is_err() {
            return Err(Error::JVMCreationFailed);
        }
        let uid = JVM_COUNTER.fetch_add(1, Ordering::Relaxed);
        let jvm = jvm.unwrap();
        let wrapper = TransSecsWrapper{uid,jvm:Arc::new(Mutex::new(jvm)), listener_map:Arc::new(Mutex::new(HashMap::new()))};
        wrapper.register_native_methods().unwrap();
        UID_WRAPPER_MAP.lock().unwrap().insert(uid, wrapper.clone());
        return Ok(wrapper);
    }

    /// Register topicValueChanged and also set the UID on the NativeTopicListener class
    fn register_native_methods(&self) -> Result<(), Error>{
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let listener_class = env.find_class("com/ergotech/vib/servers/nativewrapper/NativeTopicListener");
        if listener_class.is_err() {
            return Err(Error::InternalError(format!("Failed to find listener class: {}", listener_class.unwrap_err())));
        }
        let listener_class = listener_class.unwrap();

        env.call_static_method(listener_class, "setWrapperUID", "(I)V", &[JValue::from(self.uid)]).unwrap();

        let raw = TransSecsWrapper::topic_value_changed as *mut() as *mut c_void;
        let native_method = NativeMethod{name:"topicValueChanged".into(), sig:"(Ljava/lang/String;Lcom/ergotech/vib/valueobjects/ValueObjectInterface;)V".into(), fn_ptr:raw};
        let res = env.register_native_methods(listener_class, &[native_method]);
        if res.is_err() {
            return Err(Error::InternalError(format!("Failed to register topicValueChanged: {}", res.unwrap_err())));
        } else {
            return Ok(());
        }
    }

    /// 
    fn topic_value_changed(env:JNIEnv, listener_object:JObject, topic_string:JString, value_jobject:JObject) {
        let listener_class = env.get_object_class(listener_object).unwrap();
        let res = env.call_static_method(listener_class, "getWrapperUID", "()I", &[]);
        if res.is_err() {
            eprintln!("Failed to get wrapper UID: {}", res.unwrap_err());
            return;
        }
        let uid = res.unwrap().i().unwrap();

        let res = env.call_method(listener_object, "getListenerUUID", "()[B", &[]);
        if res.is_err() {
            eprintln!("Failed to get listener UUID: {}", res.unwrap_err());
            return;
        }
        let res = res.unwrap().l().unwrap();
        let uuid_bytes = env.convert_byte_array(*res).unwrap();
        let uuid_buf:Result<[u8;16], _> = uuid_bytes[0..16].try_into();
        if uuid_buf.is_err() {
            eprintln!("Bad uuid buffer: {}", uuid_buf.unwrap_err());
            return;
        }
        let uuid_buf = uuid_buf.unwrap();
        let uuid:u128 = u128::from_be_bytes(uuid_buf);

        let topic_string = env.get_string(JString::from(topic_string));
        if topic_string.is_err() {
            eprintln!("Error getting string from Java string");
            return;
        }
        let topic_string = topic_string.unwrap();
        let test_value = topic_string.to_str();
        let topic_string_value;
        if test_value.is_err() {
            eprintln!("Warning: string value contained invalid utf8 characters, returning a lossy string");
            topic_string_value = topic_string.to_string_lossy().to_string();
        } else {
            topic_string_value = test_value.unwrap().to_string();
        }

        let value_object = ValueObject::from_jobject(&env, value_jobject);
        if value_object.is_err() {
            eprintln!("Error getting ValueObject from jobject: {:?}", value_object.unwrap_err());
            return;
        }
        let value_object = value_object.unwrap();

        let wrapper_map = UID_WRAPPER_MAP.lock().unwrap();
        let wrapper = &wrapper_map[&uid];
        let mut listener_map = wrapper.listener_map.lock().unwrap();
        
        let callback = listener_map.get_mut(&uuid).unwrap();

        callback(topic_string_value, value_object);

    }

    /// Add a new listener to the broker. Returns the listener uuid which is necessary to unsubscribe, or an error.
    pub fn subscribe<'a, S, F>(&self, topic:S, callback:F) -> Result<ListenerUUID, Error> where S:Into<String>, F:FnMut(String, ValueObject) + Send + 'static {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let listener_class = env.find_class("com/ergotech/vib/servers/nativewrapper/NativeTopicListener");
        if listener_class.is_err() {
            return Err(Error::InternalError(format!("Failed to find listener class: {}", listener_class.unwrap_err())));
        }
        let listener_class = listener_class.unwrap();
        let listener_object = env.new_object(listener_class, "()V", &[]).unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        env.call_method(broker, "subscribe", "(Ljava/lang/String;Lcom/ergotech/vib/servers/nativewrapper/TopicListener;)V", &[JValue::Object(*topic_jstring), JValue::Object(listener_object)]).unwrap();
        let res = env.call_method(listener_object, "getListenerUUID", "()[B", &[]);
        if res.is_err() {
            return Err(Error::InternalError(format!("Failed to get listener uuid: {}", res.unwrap_err())));
        }
        let res = res.unwrap().l().unwrap();
        let uuid_bytes = env.convert_byte_array(*res).unwrap();
        let uuid_buf:Result<[u8;16], _> = uuid_bytes[0..16].try_into();
        if uuid_buf.is_err() {
            return Err(Error::InternalError(format!("Bad uuid buffer: {}", uuid_buf.unwrap_err())));
        }
        let uuid_buf = uuid_buf.unwrap();
        let uuid:u128 = u128::from_be_bytes(uuid_buf);
        let mut listener_map = self.listener_map.lock().unwrap();
        listener_map.insert(uuid, Box::new(callback));
        return Ok(uuid) // TODO Error handle
    }

    /// Unsubscribe from a topic, using the listener_uuid that is returned from a subscription
    pub fn unsubscribe(&self, listener_uuid:ListenerUUID) -> Result<(), Error> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let bytes = u128::to_be_bytes(listener_uuid);
        let java_bytes = env.byte_array_from_slice(&bytes).unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let res = env.call_method(broker, "removeListener", "(Ljava/lang/String;[B)V", &[JValue::from(java_bytes)]);
        if res.is_err() {
            return Err(Error::InternalError(format!("Failed to remove listener: {}", res.unwrap_err())));
        } else {
            return Ok(());
        }
    }

    /// Start the main class for the wrapper
    pub fn start_main<S>(&self, package_name:S) -> Result<(), Error> where S:Into<String> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let main_qualified_name = format!("deploy/{}/EquipmentController", package_name.into());
        let controller_class = env.find_class(&main_qualified_name);
        if controller_class.is_err() {
            return Err(Error::BadControllerClass(main_qualified_name));
        }
        let controller_class = controller_class.unwrap();
        let res = env.call_static_method(controller_class, "startMain", "()V", &[]);
        if res.is_err() {
            return Err(Error::InternalError(format!("Failed to start controller class: {}", res.unwrap_err())));
        } else {
            return Ok(());
        }
    }

    /// Add a few test values to the broker
    pub fn add_test_values_to_broker(&self) -> Result<(), Error> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker_class = env.find_class("com/ergotech/vib/servers/nativewrapper/TopicBroker").unwrap();
        let res = env.call_static_method(broker_class, "addTestData", "()V", &[]);
        if res.is_err() {
            eprintln!("{}", res.unwrap_err());
        }
        return Ok(());
    }

    /// Return the latest value associated with a topic. Will return Error::NoSuchTopic if the topic does not exist.
    pub fn get<S>(&self, topic:S) -> Result<ValueObject, Error> where S:Into<String> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let res = env.call_method(broker, "get", "(Ljava/lang/String;)Lcom/ergotech/vib/valueobjects/ValueObjectInterface;", &[JValue::Object(*topic_jstring)]);
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling get on broker: {}", res.unwrap_err())))
        }
        let value_jobject = res.unwrap().l().unwrap();

        let quality = env.call_method(value_jobject, "getQuality", "()I", &[]);
        if quality.is_err() {
            return Err(Error::InternalError(format!("Error calling getTimeMillis on value object: {}", quality.unwrap_err())))
        }
        let quality = quality.unwrap().i().unwrap();
        if quality == QUALITY_OUT_OF_SERVICE {
            return Err(Error::NoSuchTopic);
        }

        return ValueObject::from_jobject(&env, value_jobject);
    }

    /// Publish an int to a topic. Will create the topic if it does not exist.
    pub fn publish_int<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<i64> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let res = env.call_method(broker, "publish", "(Ljava/lang/String;J)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Publish a bool to a topic. Will create the topic if it does not exist.
    pub fn publish_bool<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<bool> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let res = env.call_method(broker, "publish", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Publish a string to a topic. Will create the topic if it does not exist.
    pub fn publish_string<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<String> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let value_jstring = env.new_string(value.into()).unwrap();
        let res = env.call_method(broker, "publish", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value_jstring)]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Publish an object in json format to a topic. Will create the topic if it does not exist.
    pub fn publish_json<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<String> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let value_jstring = env.new_string(value.into()).unwrap();
        let res = env.call_method(broker, "publishObject", "(Ljava/lang/String;Z)V", &[JValue::Object(*topic_jstring), JValue::from(value_jstring)]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Publish a byte slice to a topic. Will create the topic if it does not exist.
    pub fn publish_bytes<S>(&self, topic:S, value:&[u8]) -> Result<(), Error> where S:Into<String>{
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();

        let byte_array = env.byte_array_from_slice(value);
        if byte_array.is_err() {
            return Err(Error::InternalError(format!("Error getting byte array from slice: {}", byte_array.unwrap_err())))
        }
        let byte_array = byte_array.unwrap();

        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let res = env.call_method(broker, "publish", "(Ljava/lang/String;[B])V", &[JValue::Object(*topic_jstring), JValue::from(byte_array)]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Publish a float to a topic. Will create the topic if it does not exist.
    pub fn publish_float<S, I>(&self, topic:S, value:I) -> Result<(), Error> where S:Into<String>, I:Into<f64> {
        let jvm = self.jvm.lock().unwrap();
        let env = jvm.attach_current_thread();
        if env.is_err() {
            return Err(Error::JNIEnvError);
        }
        let env = env.unwrap();
        let broker = TransSecsWrapper::get_broker_instance(&env);
        let topic_jstring = env.new_string(topic.into()).unwrap();
        let res = env.call_method(broker, "publish", "(Ljava/lang/String;D)V", &[JValue::Object(*topic_jstring), JValue::from(value.into())]);        
        if res.is_err() {
            return Err(Error::InternalError(format!("Error calling publish on broker: {}", res.unwrap_err())))
        }
        return Ok(());
    }

    /// Set an alarm
    pub fn alarm<S>(&self, alarm_name:S, set:bool) -> Result<(), Error> where S:Into<String> {
        return self.publish_bool(format!("gemtool/ceids/{}/trigger", alarm_name.into().to_lowercase()), set)
    }

    /// Trigger an event
    pub fn trigger<S>(&self, event_name:S, ceid_trigger:S) -> Result<(), Error> where S:Into<String> {
        return self.publish_string(format!("gemtool/ceids/{}/trigger", event_name.into().to_lowercase()), ceid_trigger)
    }

    fn get_broker_instance<'a>(env:&'a JNIEnv) -> JObject<'a> {
        let broker_class = env.find_class("com/ergotech/vib/servers/nativewrapper/TopicBroker").unwrap();
        let res = env.call_static_method(broker_class, "getInstance", "()Lcom/ergotech/vib/servers/nativewrapper/TopicBroker;", &[]);
        return res.unwrap().l().unwrap();
    }
}