I am trying to write a script that involves redis operations.
So after every minute, I call the testFunction (given below) this function gets all the keys till current minute from a redis sorted set (test_sorted_set) (each key in this sorted set represents another sorted set) then I call processData and processSet in parallel to make code execution faster. However, I am worried this might cause issues as processSet deletes some items from the member sorted set while processData gets the length of that sorted set.
Is using promise.allSettled correct in this scenario or should I instead go for two separate awaits?
const testFunction=async()=>{
//ts for current minute
let date = new Date();
date.setUTCSeconds(0);
date.setUTCMilliseconds(0);
const currentTimestamp = date.getTime();
//gets all keys till current minute (ZRANGE)
let [keys, rangeError] = await cacheService.getDataInRange(
'test_sorted_set',
'-inf',
currentTimestamp,
'BYSCORE'
);
for (let member of keys) {
const results = await Promise.allSettled([
processData(member),
processSet(member),
]);
let processSetSuccess = true;
for (let promiseResult of results) {
const [_, processSetError] = promiseResult['value'];
if (processSetError) {
processSetSuccess = false;
console.log(
'Something went wrong while processing: ',
processSetError
);
}
}
if (processSetSuccess) {
//do something
}
}
}
Process Data function:
const processData = async (member) => {
//get length of member sorted set(zlen)
const [count, countErr] = await cacheService.getSortedSetLength(member);
if (countErr) {
return [null, countErr];
}
let key = getKey(member);
//3 get data from hash for particular key
let [currentArr, hashError] = await cacheService.getFieldFromHash(
"test_data",
key,
);
if (hashError) {
return [null, hashError];
}
let hashArray = [];
currentArr = JSON.parse(currentArr);
if (currentArr && currentArr.length > 0) {
//array already present
hashArray = await getNewArray(currentArr, count, timestamp);
} else {
//new entry
hashArray = await getNewArray([], count, timestamp);
}
//4 Add updated array to hash
const [addSuccess, addErr] = await cacheService.addToHash(
"test_data",
key,
JSON.stringify(hashArray),
);
if (addErr) {
return [null, addErr];
}
return [true, null];
};
Process set function:
const processSet = async (member) => {
//Remove all keys with 0 score
const [removeSuccess, removeErr] = await cacheService.removeByScore(
member,
0,
0,
);
let newSortedSetKey = getNextTimestamp(member);
let newTS = parseInt(newSortedSetKey.split("_")[1]);
//Store Difference between current and next set in a temporary sorted set
const [_, diffErr] = await cacheService.storeSortedSetDifference(
`${newSortedSetKey}_temp`,
member,
newSortedSetKey,
);
//Combine the temporary set and next set (union)
const [destCount, unionErr] = await cacheService.unionStore(
newSortedSetKey,
newSortedSetKey,
`${newSortedSetKey}_temp`,
);
if (destCount > 0) {
//push to test_keys
const [addKeysSuccess, addKeysError] = await cacheService.addToSortedSet(
"sorted_set_1",
newTS,
newSortedSetKey,
);
}
const [delSuccess, delError] = await cacheService.deleteDataStructure(
`${newSortedSetKey}_temp`,
);
return [true, null];
};
Redis Service functions:
const getSortedSetLength = async (setKey) => {
const [data, err] = await redisConnection.zcard(setKey);
if (err) {
console.log('REDIS ERROR');
console.log(err);
captureException(err);
}
return [data, err];
};
const removeByScore = async (key, minScore, maxScore) => {
const [data, err] = await redisConnection.zremrangebyscore(key, minScore, maxScore);
if (err) {
console.log('REDIS ERROR');
console.log(err);
captureException(err);
}
return [data, err];
};