// TODO: Split into three diff files
import fetch from "node-fetch";
import { formatEther, parseUnits } from "ethers";
import "dotenv/config";
import { ApiPromise, WsProvider } from "@polkadot/api";
import axios from "axios";
import { feedAssetsConfig } from "./assetsConfig.js";
import * as child from "child_process";
import promptSync from "prompt-sync";
import { generateAccount } from "./utils/index.js";
function convertToInt(string) {
return parseInt(string.replace(/,/g, ""));
}
const ETHERSCAN_API_KEY = "7KT9JFYWU7UIGBUH4TV4QF21PTCE4W7UPN";
const ETHERSCAN_API_URL = "https://api-sepolia.etherscan.io/api";
function getRandomInt(max) {
return Math.floor(Math.random() * max);
}
class OffchainWorker {
#api;
#account;
#workerOn;
#allPendingWithdraws = [];
#allPendingDeposits = [];
#allResponses = [];
// Keeps track of all deposits that the worker need to check for punishable actions
#allDepositSlashes = [];
// Keeps track of all withdraws that the worker need to check for punishable actions
#allWithdrawSlashes = [];
constructor(api, account) {
this.#api = api;
this.#account = account;
this.#workerOn = true;
}
async #getHigherStaker() {
let higherStaker = await this.#api.query.stakingWorkers.higherStaker();
higherStaker = higherStaker.toJSON();
return higherStaker === this.#account.address;
}
// Will split mechanics into higherStaker and others
async #oracleWorker(api, account) {
api.rpc.chain.subscribeNewHeads(async (header) => {
const headerJson = JSON.parse(header);
const currentBlockNumber = headerJson.number;
const isHighestStaker = await this.#getHigherStaker();
// The following logic needs further optimizations
// Consider proposing on blocks currentBlock, currentBlock+1, currentBlock+2
// Voters can give agreement at block+2, block+3
let block;
let shouldPropose = false;
let shouldVote = false;
let shouldRequestConsensus = false;
if (currentBlockNumber % 4 === 0) {
block = currentBlockNumber;
shouldPropose = true;
} else if (currentBlockNumber % 4 === 1) {
block = currentBlockNumber - 1;
shouldRequestConsensus = true;
} else if (currentBlockNumber % 4 === 2) {
block = currentBlockNumber - 2;
shouldPropose = true;
shouldVote = true;
shouldRequestConsensus = true;
} else if (currentBlockNumber % 4 === 3) {
block = currentBlockNumber - 3;
shouldPropose = true;
}
// if worker has already worked, skip working and compute consensus
let currentVote =
await this.#api.query.stakingWorkers.answerbyBlockNumber([
block,
this.#account.address,
]);
currentVote = currentVote !== null ? currentVote.toJSON() : null;
// TODO: potentially increase shares if consensus hasn't been reached
if (currentVote != null) {
shouldPropose = false;
shouldVote = false;
await this.#computePriceConsensus(block, currentVote);
}
if (shouldPropose && isHighestStaker) {
try {
let priceData = await this.#fetchPriceData();
try {
const proposePrice = api.tx.stakingWorkers.proposePrices(
priceData,
block,
);
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await proposePrice.signAndSend(
account,
{ nonce },
({ status, events, dispatchError }) => {
if (dispatchError) {
if (dispatchError.isModule) {
const decoded = api.registry.findMetaError(
dispatchError.asModule,
);
const { docs, name, section } = decoded;
console.log(`${section}.${name}: ${docs.join(" ")}`);
} else {
console.log(dispatchError.toString());
}
}
},
);
} catch (error) {
console.log(error);
process.exit();
}
console.log("Fed in block ", currentBlockNumber);
console.log("Fed by ", account.address);
console.log("Price details", priceData);
} catch (error) {
console.log(error);
process.exit();
}
}
// TODO: Handle case where higher staker hasn't propose for a specific block and slash them
// If higher staker hasn't already voted, vote false
let proposedValues =
await this.#api.query.stakingWorkers.valuesbyBlockNumber(block);
proposedValues = proposedValues !== null ? proposedValues.toJSON() : null;
if (proposedValues == null) {
shouldVote = false;
}
if (shouldVote && !isHighestStaker) {
let votePref = true;
let priceData = await this.#fetchPriceData();
// 10% allowed difference
let allowedDifference = 10;
// Iterate through the price data
for (let i = 0; i < proposedValues.length; i++) {
//TODO: Should workers vote based on the proposed supply as well?
// Extract the second element from each sublist, which represents the price of each asset
let value1 = proposedValues[i][1];
let value2 = priceData[i][1];
// Calculate the percentage difference
let percentageDifference =
(BigInt(100) *
BigInt(
Math.abs(Number(BigInt(value1)) - Number(BigInt(value2))),
)) /
BigInt(value1);
// Check if the percentage difference is within the allowed range
// If it isn't, exit the loop and vote false on the proposed prices
if (percentageDifference > allowedDifference) {
votePref = false;
break;
}
}
try {
const giveAgreementExtrinsic = api.tx.stakingWorkers.giveAgreement(
block,
votePref,
);
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await giveAgreementExtrinsic.signAndSend(
account,
{ nonce },
({ status, events, dispatchError }) => {
if (dispatchError) {
if (dispatchError.isModule) {
const decoded = api.registry.findMetaError(
dispatchError.asModule,
);
const { docs, name, section } = decoded;
console.log(`${section}.${name}: ${docs.join(" ")}`);
} else {
console.log(dispatchError.toString());
}
}
},
);
console.log("Agreement given in block ", block);
} catch (error) {
console.log(error);
process.exit();
}
}
// Slash malicious workers
let lastValidatedBlockPrice =
await this.#api.query.stakingWorkers.lastBlockPricesValidated();
lastValidatedBlockPrice =
lastValidatedBlockPrice !== null
? lastValidatedBlockPrice.toJSON()
: null;
if (lastValidatedBlockPrice === 0) {
return;
} else if (currentBlockNumber - lastValidatedBlockPrice > 4) {
// TODO: When we pass lastValidateBlockPrice the punishments are not being processed
await this.#punishPriceWorkers(lastValidatedBlockPrice - 4);
}
});
}
async #punishPriceWorkers(block) {
// Fetch answer given by current worker
let currentVote = await this.#api.query.stakingWorkers.answerbyBlockNumber([
block,
this.#account.address,
]);
currentVote = currentVote !== null ? currentVote.toJSON() : null;
// If current worker hasn't work, skip punishment and continue working
if (currentVote == null) {
return;
}
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// Iterate over the registered workers, and calculate whether consensus has been reached for `block`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.answerbyBlockNumber([
block,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
if (workerResponse !== null) {
// If worker has worked compare their vote with the current worker's vote
// if their work differs. punish them
// If their votes differs, punish worker for giving a wrong answer
if (currentVote != workerResponse) {
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < block
) {
console.log("Worker has been recently unregistered");
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForPrices([
block,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo != null) {
return;
}
console.log("PUNISH MALICIOUS (Price)");
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.manageSlashingPrices(block, accountId)
.signAndSend(this.#account, { nonce });
}
} else {
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < block
) {
console.log("Worker has been recently unregistered");
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForPrices([
block,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo !== null) {
return;
}
// Punish worker for not working
console.log(
"PUNISH NON WORKER (Price)",
accountId,
"for block ",
block,
);
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
try {
await this.#api.tx.stakingWorkers
.manageSlashingPrices(block, accountId)
.signAndSend(this.#account, { nonce });
} catch (error) {
console.error("Error sending the transaction", error);
}
}
}),
);
}
async #fetchPriceData() {
const { data: response } = await axios.get(process.env.CRYPTO_PRICE_API);
let stablesData = (await axios.get(process.env.FIAT_PRICE_API)).data.data;
let priceData = [];
feedAssetsConfig.map((asset) => {
let assetData = response.find(
(item) => item.id.toLowerCase() === asset.id.toLowerCase(),
);
if (assetData) {
const formatPrice = parseUnits(
assetData.current_price.toString(),
asset.decimals,
);
priceData.push([
asset.assetId,
formatPrice,
parseInt(assetData.circulating_supply),
]);
} else {
assetData = stablesData[asset.id];
const containsFree = process.env.CRYPTO_PRICE_API.includes("free");
if (assetData) {
let formatPrice;
if (containsFree) {
formatPrice = parseUnits(
(1 / assetData).toFixed(6).toString(),
asset.decimals,
);
} else {
formatPrice = parseUnits(
(1 / assetData.value).toFixed(6).toString(),
asset.decimals,
);
}
priceData.push([asset.assetId, formatPrice, 0]);
}
}
});
return priceData;
}
async #computePriceConsensus(block, vote) {
// Retrieve total shares that currently exist
let poolShares = await this.#api.query.stakingWorkers.poolShare();
const totalShares = poolShares.toNumber();
// Number of shares for the specific hash that this worker voted for
let txShares = 0;
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// TODO: Should we factor in the "Disable" worker's shares?
// Iterate over the registered workers, and calculate whether consensus has been reached for `block`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
if (workerInfo.state == "Disable") {
return;
}
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.answerbyBlockNumber([
block,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
// If worker has worked compare their vote with the current worker's vote
if (workerResponse !== null) {
// If their votes are the same add worker's shares to `txShares`
if (vote === workerResponse) {
txShares = txShares + workerInfo.shares;
}
}
// else do nothing
}),
);
// Check whether consensus has been reached and validate price
const consensusReached = txShares / totalShares >= 0.8; // Check if shares >= 80% of total shares
if (consensusReached) {
// Set a pseudo-random time buffer to avoid network conjestion of txs and
// prevent workers from wasting their fees when trying to validate an already validated block-price.
let buffer = getRandomInt(4000);
setTimeout(async () => {
// If consensus hasn't been finalized yet, validate block-price
let priceResult =
await this.#api.query.stakingWorkers.consensusbyBlockNumber(block);
priceResult = priceResult !== null ? priceResult.toJSON() : null;
if (priceResult == null) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.requestConsensusPrices(block, vote)
.signAndSend(this.#account, { nonce });
console.log("REACHED + ACCEPTED (Prices)");
}
}, buffer);
}
}
async #updateWithdraws() {
this.#allPendingWithdraws = [];
const withdraws =
await this.#api.query.withdraw.executingTransactions.keys();
await Promise.all(
withdraws.map((withdraw) => {
const withdrawId = convertToInt(withdraw.toHuman()[0]);
return this.#api.query.withdraw
.withdrawIdToWithdraw(withdrawId)
.then((withdraw) => {
let withdrawJSON = withdraw.toHuman();
this.#allPendingWithdraws.push({
ID: convertToInt(withdrawJSON.withdrawId),
from: withdrawJSON.fromAccount,
to: withdrawJSON.withdrawToAccount,
amount: convertToInt(withdrawJSON.amount),
asset: convertToInt(withdrawJSON.assetId),
timestamp: convertToInt(withdrawJSON.timestamp),
});
// Update allWithdrawSlashes with new pending withdraws
let withdrawExists = this.#allWithdrawSlashes.some(
(resp) => resp.ID === convertToInt(withdrawJSON.withdrawId),
);
if (!withdrawExists) {
// Initialize punish_at_block to zero and update it once consnenus has been reached,
// which is the moment of initializing `canVoteUntil` field within the pallet.
this.#allWithdrawSlashes.push({
ID: convertToInt(withdrawJSON.withdrawId),
asset: convertToInt(withdrawJSON.assetId),
timestamp: convertToInt(withdrawJSON.timestamp),
punish_at_block: 0,
});
}
});
}),
);
console.log("WITHDRAWS UPDATED");
}
async #checkTransactionBetweenAddresses(toAddress, amount, timestamp) {
try {
const response = await fetch(
`${ETHERSCAN_API_URL}?module=account&action=txlist&address=${toAddress}&startblock=0&endblock=99999999&sort=asc&apikey=${ETHERSCAN_API_KEY}`,
);
if (!response.ok) {
return false;
}
const data = await response.json();
if (data.status !== "1") {
return false;
}
const now = Math.floor(Date.now() / 1000);
// Filter transactions between addressA and addressB
const transactionsBetweenAddresses = data.result.filter(
(transaction) =>
transaction.timeStamp > timestamp &&
now - transaction.timeStamp <= 3600,
);
// Take the last 10 transactions between addressA and addressB and reverse to get the oldest one in first position
const lastTenTransactions = transactionsBetweenAddresses
.slice(-10)
.reverse();
for (let transaction of lastTenTransactions) {
const amount_in_eth = formatEther(transaction.value);
const amount_in_blockchain = parseUnits(amount_in_eth, 10);
if (amount_in_blockchain == amount) {
if (
!this.#allResponses.some((resp) => resp.hash === transaction.hash)
)
return { hash: transaction.hash, timestamp: transaction.timeStamp };
}
}
} catch (error) {
console.error("Error in checkTransactionBetweenAddresses:", error);
}
return false;
}
async #punishWithdrawWorkers() {
this.#allWithdrawSlashes.sort((a, b) => a.timestamp - b.timestamp);
for (let withdraw of this.#allWithdrawSlashes) {
// Check if consensus has been finalized
// If consensus has not been finalized, skip punishment for this withdraw
let withdrawResult =
await this.#api.query.stakingWorkers.withdrawResultConsensus(
withdraw.ID,
);
withdrawResult = withdrawResult !== null ? withdrawResult.toJSON() : null;
if (withdrawResult === null) {
continue;
}
// Check if workers are allowed to punish
// If current block is smaller than `canVoteUntil` block, skip punishment for this withdraw
let withdrawInfo = await this.#api.query.withdraw.withdrawIdToWithdraw(
withdraw.ID,
);
withdrawInfo = withdrawInfo !== null ? withdrawInfo.toJSON() : null;
// Retrieve the latest header
const lastHeader = await this.#api.rpc.chain.getHeader();
// Retrieve current block
const lastBlock = lastHeader.number;
if (withdrawInfo.canVoteUntil > lastBlock.toNumber()) {
continue;
}
// Should workers validate based on their vote or the succesful one
// of `withdrawResultConsensus`
let workerResponse =
await this.#api.query.stakingWorkers.workerWithdrawResponse([
withdraw.ID,
this.#account.address,
]);
let currentWorkerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
if (currentWorkerResponse === null) {
// TODO: Should i keep this line - consider case where calling between these functions indefinitely
await this.#processWithdraws();
continue;
}
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// TODO: Should we factor in the "Disable" worker's shares?
// Iterate over the registered workers, and calculate whether consensus has been reached for `withdraw.ID`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.workerWithdrawResponse([
withdraw.ID,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
if (workerResponse !== null) {
// If worker has worked compare their vote with the current worker's vote
// if their work differs. punish them
// If their votes differs, punish worker for giving a wrong answer
if (
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Accepted" &&
workerResponse.txHash !== currentWorkerResponse.txHash) ||
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Rejected") ||
(currentWorkerResponse.response === "Rejected" &&
workerResponse.response === "Accepted")
) {
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < withdrawInfo.createdAt
) {
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForWithdraw([
withdraw.ID,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo !== null) {
return;
}
console.log("PUNISH MALICIOUS (Withdraw)");
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.slashWorkerForWithdraw(withdraw.ID, accountId)
.signAndSend(this.#account, { nonce });
// Remove withdraw from storage to avoid punishing multiple times
this.#allWithdrawSlashes = this.#allWithdrawSlashes.filter(
(entry) => entry.ID !== withdraw.ID,
);
}
} else {
// TODO: Add check whether worker has been punished to avoid spamming the network
// and wasting fees
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < withdrawInfo.createdAt
) {
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForWithdraw([
withdraw.ID,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo !== null) {
return;
}
// Punish worker for not working
console.log("PUNISH NON WORKER (Withdraw)");
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.slashWorkerForWithdraw(withdraw.ID, accountId)
.signAndSend(this.#account, { nonce });
// Remove withdraw from storage to avoid punishing multiple times
this.#allWithdrawSlashes = this.#allWithdrawSlashes.filter(
(entry) => entry.ID !== withdraw.ID,
);
}
}),
);
}
}
async #processWithdraws() {
this.#allPendingWithdraws.sort((a, b) => a.timestamp - b.timestamp);
const now = Math.floor(Date.now() / 1000);
for (let withdraw of this.#allPendingWithdraws) {
let workerResponse =
await this.#api.query.stakingWorkers.workerWithdrawResponse([
withdraw.ID,
this.#account.address,
]);
workerResponse = workerResponse !== null ? workerResponse.toJSON() : null;
// If worker has already worked for this withdraw, calculate consensus and skip scanning
if (workerResponse !== null) {
await this.#computeWithdrawConsensus(withdraw.ID, workerResponse);
continue;
}
if (withdraw.asset === 2) {
// Scan external chain for the pending withdraw
const transactionExists = await this.#checkTransactionBetweenAddresses(
withdraw.to,
withdraw.amount,
withdraw.timestamp,
);
if (transactionExists) {
try {
// If tx exists pass the tx hash
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.voteForWithdraw(withdraw.ID, transactionExists.hash)
.signAndSend(this.#account, { nonce });
console.log('Voted "Accepted" for Withdraw: ', withdraw.ID);
this.#allResponses.push({
id: withdraw.ID,
hash: transactionExists.hash,
value: true,
});
} catch (error) {
console.error("Error sending the transaction", error);
}
// TODO: Decide the actual time buffer
} else if (now - withdraw.timestamp > 3600) {
try {
// If tx doesn't exist vote "Rejected" and pass no tx hash
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.voteForWithdraw(withdraw.ID, null)
.signAndSend(this.#account, { nonce });
console.log('Voted "Rejected" for Withdraw: ', withdraw.ID);
this.#allResponses.push({ id: withdraw.ID, value: false });
} catch (error) {
console.error("Error sending the transaction", error);
}
}
}
}
}
async #computeWithdrawConsensus(withdrawId, currentWorkerResponse) {
// TODO: Consider if it makes sense to include the following
// If consensus has been reached, search for malicious users and skip
let withdrawResult =
await this.#api.query.stakingWorkers.withdrawResultConsensus(withdrawId);
if (
withdrawResult.reponse === "Accepted" ||
withdrawResult.reponse === "Rejected"
) {
// await this.#punishWithdrawWorkers(withdrawId, withdrawResult);
return;
}
// Retrieve total shares that currently exist
let poolShares = await this.#api.query.stakingWorkers.poolShare();
const totalShares = poolShares.toNumber();
// Number of shares for the specific hash that this worker voted for
let txShares = 0;
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// TODO: Should we factor in the "Disable" worker's shares?
// Iterate over the registered workers, and calculate whether consensus has been reached for `withdraw.ID`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
if (workerInfo.state == "Disable") {
return;
}
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.workerWithdrawResponse([
withdrawId,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
// If worker has worked compare their vote with the current worker's vote
if (workerResponse !== null) {
// If their votes are the same add worker's shares to `txShares`
if (
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Accepted" &&
workerResponse.txHash === currentWorkerResponse.txHash) ||
(currentWorkerResponse.response === "Rejected" &&
workerResponse.response === "Rejected")
) {
txShares = txShares + workerInfo.shares;
}
}
// else do nothing
}),
);
// Check whether consensus has been reached and validate withdraw
const consensusReached = txShares / totalShares >= 0.8; // Check if shares >= 80% of total shares
if (consensusReached && currentWorkerResponse.response === "Accepted") {
// Set a pseudo-random time buffer to avoid network conjestion of txs and
// prevent workers from wasting their fees when trying to validate an already validated withdraw.
let buffer = getRandomInt(19000);
setTimeout(async () => {
// If consensus hasn't been finalized yet, validate withdraw
let withdrawResult =
await this.#api.query.stakingWorkers.withdrawResultConsensus(
withdrawId,
);
withdrawResult =
withdrawResult !== null ? withdrawResult.toJSON() : null;
if (withdrawResult == null) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.setWithdrawResult(withdrawId, currentWorkerResponse.txHash)
.signAndSend(this.#account, { nonce });
console.log(`Consensus has been reached and accepted for withdraw ${withdrawId}`);
}
}, buffer);
} else if (
consensusReached &&
currentWorkerResponse.response === "Rejected"
) {
// Set a pseudo-random time buffer to avoid network conjestion of txs and
// prevent workers from wasting their fees when trying to validate an already validated withdraw.
let buffer = getRandomInt(19000);
setTimeout(async () => {
// If consensus hasn't been reached, validate withdraw
let withdrawResult =
await this.#api.query.stakingWorkers.withdrawResultConsensus(
withdrawId,
);
if (withdrawResult == null) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.setWithdrawResult(withdrawId, null)
.signAndSend(this.#account, { nonce });
console.log(`Consensus has been reached and rejected for Withdraw ${withdrawId}`);
}
}, buffer);
}
return false;
}
async #punishDepositWorkers() {
this.#allDepositSlashes.sort((a, b) => a.timestamp - b.timestamp);
for (let deposit of this.#allDepositSlashes) {
// Check if consensus has been finalized
// If consensus has not been finalized, skip punishment for this deposit
let depositResult =
await this.#api.query.stakingWorkers.depositResultConsensus(deposit.ID);
depositResult = depositResult !== null ? depositResult.toJSON() : null;
if (depositResult === null) {
continue;
}
// Check if workers are allowed to punish
// If current block is smaller than `canVoteUntil` block, skip punishment for this deposit
let depositInfo = await this.#api.query.deposit.depositIdToDeposit(
deposit.ID,
);
depositInfo = depositInfo !== null ? depositInfo.toJSON() : null;
// Retrieve the latest header
const lastHeader = await this.#api.rpc.chain.getHeader();
// Retrieve current block
const lastBlock = lastHeader.number;
if (depositInfo.canVoteUntil > lastBlock.toNumber()) {
continue;
}
// Should workers validate based on their vote or the succesful one
// of `depositResultConsensus`
let workerResponse =
await this.#api.query.stakingWorkers.workerDepositResponse([
deposit.ID,
this.#account.address,
]);
let currentWorkerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
if (currentWorkerResponse === null) {
await this.#processDeposits();
continue;
}
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// TODO: Should we factor in the "Disable" worker's shares?
// Iterate over the registered workers, and calculate whether consensus has been reached for `deposit.ID`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.workerDepositResponse([
deposit.ID,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
if (workerResponse !== null) {
// If worker has worked compare their vote with the current worker's vote
// if their work differs. punish them
// If their votes differs, punish worker for giving a wrong answer
if (
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Accepted" &&
workerResponse.txHash !== currentWorkerResponse.txHash) ||
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Rejected") ||
(currentWorkerResponse.response === "Rejected" &&
workerResponse.response === "Accepted")
) {
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < depositInfo.createdAt
) {
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForDeposit([
deposit.ID,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo !== null) {
return;
}
console.log(`Punish malicious user ${accountId} for deposit ${deposit.ID}`);
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.slashWorkerForDeposit(deposit.ID, accountId)
.signAndSend(this.#account, { nonce });
// Remove deposit from storage to avoid punishing multiple times
this.#allDepositSlashes = this.#allDepositSlashes.filter(
(entry) => entry.ID !== deposit.ID,
);
}
} else {
// TODO: Add check whether worker has been punished to avoid spamming the network
// and wasting fees
// Ensure malicious user hasn't been unregistered recently
if (
workerInfo.state === "Disable" &&
workerInfo.unregisterAt < depositInfo.createdAt
) {
return;
}
// Ensure malicious user hasn't been slashed for this price block before
let slashInfo =
await this.#api.query.stakingWorkers.slashInfoForDeposit([
deposit.ID,
accountId,
]);
slashInfo = slashInfo !== null ? slashInfo.toJSON() : null;
// If user has been slashed for this particular block, skip the iteration
if (slashInfo !== null) {
return;
}
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
// Punish worker for not working
console.log(`Punish non worker user ${accountId} for deposit ${deposit.ID}`);
await this.#api.tx.stakingWorkers
.slashWorkerForDeposit(deposit.ID, accountId)
.signAndSend(this.#account, { nonce });
// Remove deposit from storage to avoid punishing multiple times
this.#allDepositSlashes = this.#allDepositSlashes.filter(
(entry) => entry.ID !== deposit.ID,
);
}
}),
);
}
}
async #computeDepositConsensus(depositId, currentWorkerResponse) {
// TODO: Consider if it makes sense to include the following
// If consensus has been reached, search for malicious users and skip
let depositResult =
await this.#api.query.stakingWorkers.depositResultConsensus(depositId);
if (
depositResult.reponse === "Accepted" ||
depositResult.reponse === "Rejected"
) {
// await this.#punishDepositWorkers(depositId, depositResult);
return;
}
// Retrieve total shares that currently exist
let poolShares = await this.#api.query.stakingWorkers.poolShare();
const totalShares = poolShares.toNumber();
// Number of shares for the specific hash that this worker voted for
let txShares = 0;
const registeredWorkers =
await this.#api.query.stakingWorkers.registeredWorkers.keys();
// TODO: Should we factor in the "Disable" worker's shares?
// Iterate over the registered workers, and calculate whether consensus has been reached for `deposit.ID`'s worker's vote
await Promise.all(
registeredWorkers.map(async (worker) => {
const accountId = worker.args[0].toString();
let workerInfo =
await this.#api.query.stakingWorkers.registeredWorkers(accountId);
workerInfo = workerInfo.toJSON();
if (workerInfo.state == "Disable") {
return;
}
// Retrieve worker response
let workerResponse =
await this.#api.query.stakingWorkers.workerDepositResponse([
depositId,
accountId,
]);
workerResponse =
workerResponse !== null ? workerResponse.toJSON() : null;
// If worker has worked compare their vote with the current worker's vote
if (workerResponse !== null) {
// If their votes are the same add worker's shares to `txShares`
if (
(currentWorkerResponse.response === "Accepted" &&
workerResponse.response === "Accepted" &&
workerResponse.txHash === currentWorkerResponse.txHash) ||
(currentWorkerResponse.response === "Rejected" &&
workerResponse.response === "Rejected")
) {
txShares = txShares + workerInfo.shares;
}
}
// else do nothing
}),
);
// Check whether consensus has been reached and validate deposit
const consensusReached = txShares / totalShares >= 0.8; // Check if shares >= 80% of total shares
if (consensusReached && currentWorkerResponse.response === "Accepted") {
// Set a pseudo-random time buffer to avoid network conjestion of txs and
// prevent workers from wasting their fees when trying to validate an already validated deposit.
let buffer = getRandomInt(19000);
setTimeout(async () => {
// If consensus hasn't been finalized yet, validate deposit
let depositResult =
await this.#api.query.stakingWorkers.depositResultConsensus(
depositId,
);
depositResult = depositResult !== null ? depositResult.toJSON() : null;
if (depositResult == null) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.setDepositResult(depositId, currentWorkerResponse.txHash)
.signAndSend(this.#account, { nonce });
console.log(`Consensus has been reached and accepted for deposit ${depositId}`);
}
}, buffer);
} else if (
consensusReached &&
currentWorkerResponse.response === "Rejected"
) {
// Set a pseudo-random time buffer to avoid network conjestion of txs and
// prevent workers from wasting their fees when trying to validate an already validated deposit.
let buffer = getRandomInt(19000);
setTimeout(async () => {
// If consensus hasn't been reached, validate deposit
let depositResult =
await this.#api.query.stakingWorkers.depositResultConsensus(
depositId,
);
if (depositResult == null) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
await this.#api.tx.stakingWorkers
.setDepositResult(depositId, null)
.signAndSend(this.#account, { nonce });
console.log(`Consensus has been reached and rejected for deposit ${depositId}`);
}
}, buffer);
}
return false;
}
async #processDeposits() {
this.#allPendingDeposits.sort((a, b) => a.timestamp - b.timestamp);
const now = Math.floor(Date.now() / 1000);
for (let deposit of this.#allPendingDeposits) {
let workerResponse =
await this.#api.query.stakingWorkers.workerDepositResponse([
deposit.ID,
this.#account.address,
]);
workerResponse = workerResponse !== null ? workerResponse.toJSON() : null;
// If worker has already worked for this deposit, calculate consensus and skip scanning
if (workerResponse !== null) {
await this.#computeDepositConsensus(deposit.ID, workerResponse);
continue;
}
const Responses = {
Accepted: "Accepted",
Rejected: "Rejected",
};
if (deposit.asset === 2) {
// Scan external chain for the pending deposit
const transactionExists = await this.#checkTransactionBetweenAddresses(
deposit.to,
deposit.amount,
deposit.timestamp,
);
if (transactionExists) {
try {
// If tx exists vote "Accepted" and pass the tx hash
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
let deposit_result = Responses.Accepted;
await this.#api.tx.stakingWorkers
.voteForDeposit(
deposit.ID,
deposit_result,
transactionExists.hash,
)
.signAndSend(this.#account, { nonce });
console.log('Voted "Accepted" for Deposit: ', deposit.ID);
// TODO: Delete the following?
this.#allResponses.push({
id: deposit.ID,
hash: transactionExists.hash,
value: true,
});
} catch (error) {
console.error("Error sending the transaction", error);
}
} else if (now - deposit.timestamp > 3600) {
try {
// If tx doesn't exist vote "Rejected" and pass no tx hash
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
let deposit_result = Responses.Rejected;
await this.#api.tx.stakingWorkers
.voteForDeposit(deposit.ID, deposit_result, null)
.signAndSend(this.#account, { nonce });
console.log('Voted "Rejected" for Deposit: ', deposit.ID);
this.#allResponses.push({ id: deposit.ID, value: false });
} catch (error) {
let nonce = await this.#api.rpc.system.accountNextIndex(
this.#account.address,
);
let deposit_result = Responses.Rejected;
await this.#api.tx.stakingWorkers
.voteForDeposit(deposit.ID, deposit_result, null)
.signAndSend(this.#account, { nonce });
console.error("Error sending the transaction", error);
}
}
}
}
}
async #updateDeposits() {
this.#allPendingDeposits = [];
const deposits = await this.#api.query.deposit.pendingTransactions.keys();
await Promise.all(
deposits.map((deposit) => {
const depositId = convertToInt(deposit.toHuman()[0]);
return this.#api.query.deposit
.depositIdToDeposit(depositId)
.then((deposit) => {
let depositJSON = deposit.toHuman();
this.#allPendingDeposits.push({
ID: convertToInt(depositJSON.depositId),
to: depositJSON.depositToAccount,
amount: convertToInt(depositJSON.amount),
asset: convertToInt(depositJSON.tokenId),
timestamp: convertToInt(depositJSON.timestamp),
});
// Update allDepositSlashes with new pending deposits
let depositExists = this.#allDepositSlashes.some(
(resp) => resp.ID === convertToInt(depositJSON.depositId),
);
if (!depositExists) {
// Initialize punish_at_block to zero and update it once consnenus has been reached,
// which is the moment of initializing `canVoteUntil` field within the pallet.
this.#allDepositSlashes.push({
ID: convertToInt(depositJSON.depositId),
asset: convertToInt(depositJSON.tokenId),
timestamp: convertToInt(depositJSON.timestamp),
punish_at_block: 0,
});
}
});
}),
);
console.log("DEPOSITS UPDATED");
}
async #workerEnginewithdraw() {
await this.#updateWithdraws();
await this.#processWithdraws();
await this.#punishWithdrawWorkers();
}
async #workerEnginedeposit() {
await this.#updateDeposits();
await this.#processDeposits();
await this.#punishDepositWorkers();
}
async workerStart() {
if(this.#workerOn) {
setInterval(() => this.#workerEnginedeposit(), 10000); // Every 1 minute
setInterval(() => this.#workerEnginewithdraw(), 10000); // Every 1 minute
this.#oracleWorker(this.#api, this.#account);
console.log()
}
}
async workerStop() {
this.#workerOn = false;
}
}
export default OffchainWorker;
// Define the main function
async function main() {
const provider = new WsProvider(process.env.HOST || "http://127.0.0.1:9933");
// Create the API and wait until ready
const api = await ApiPromise.create({ provider });
let userAccount = await generateAccount(
`${process.env.EMAIL}//${process.env.PASSWORD}//${process.env.PIN}`,
);
try {
// Initialize OffchainWorker
const offchainWorker = new OffchainWorker(api, userAccount);
// Call workerStart method
await offchainWorker.workerStart();
// Any other initialization or logic can go here
} catch (error) {
console.error("An error occurred:", error);
process.exit(1); // Exit with non-zero status code to indicate failure
}
}
// Execute the main function
main();