Problem scenario:
Start the Ignite server of one node, start one thin client and create a continuous query listener, and then use 50 threads to add 500 data to the cache concurrently.
Problem phenomenon:
Through the information printed on the listener, it was found that the number of events listened to each time varies, possibly 496, 499 or 500...
Test Code:
public class StartServer {
public static void main(String[] args) {
Ignite ignite = Ignition.start();
}
}
public class StartThinClient {
public static void main(String[] args) throws InterruptedException {
String addr = "127.0.0.1:10800";
int threadNmu = 50;
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setAddresses(addr);
IgniteClient client1 = Ignition.startClient(clientConfiguration);
ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test");
ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
query.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<?, ?>> cacheEntryEvents) throws CacheEntryListenerException {
Iterator<CacheEntryEvent<?, ?>> iterator = cacheEntryEvents.iterator();
while (iterator.hasNext()) {
CacheEntryEvent<?, ?> next = iterator.next();
System.out.println("----" + next.getKey());
}
}
});
cache1.query(query);
IgniteClient client2 = Ignition.startClient(clientConfiguration);
ClientCache<Object, Object> cache2 = client2.cache("test");
Thread[] threads = new Thread[threadNmu];
for (int i = 0; i < threads.length; ++i) {
threads[i] = new Thread(new OperationInsert(cache2, i, 500, threadNmu));
}
for (int i = 0; i < threads.length; ++i) {
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
Thread.sleep(60000);
}
static class OperationInsert implements Runnable {
private ClientCache<Object, Object> cache;
private int k;
private Integer test_rows;
private Integer thread_cnt;
public OperationInsert(ClientCache<Object, Object> cache, int k, Integer test_rows, Integer thread_cnt) {
this.cache = cache;
this.k = k;
this.test_rows = test_rows;
this.thread_cnt = thread_cnt;
}
@Override
public void run() {
for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + (test_rows/thread_cnt) * (k + 1); i++) {
cache.put("" + i, "aaa");
}
}
}
}
Version:
The testing program uses Ignite version 2.15.0
I attempted to insert data using one thread and did not observe any event loss. In addition, I also attempted an Ignite cluster with two or three nodes, which can still listen to all 500 events even when inserting data using multiple threads. May I ask if this issue only occurs at a single node? Are there any good solutions?