From 99b74e6623f675194e3ea4b924db7569d6c0fb0a Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 2 Jun 2025 02:15:41 -0600 Subject: [PATCH 01/38] rolling window rate limiter --- python/ccxt/async_support/base/throttler.py | 1 + python/ccxt/base/exchange.py | 10 +++- ts/src/base/Exchange.ts | 10 +++- ts/src/base/functions/throttle.ts | 64 +++++++++++++++++++-- ts/src/bitget.ts | 1 + 5 files changed, 80 insertions(+), 6 deletions(-) diff --git a/python/ccxt/async_support/base/throttler.py b/python/ccxt/async_support/base/throttler.py index e067eddbfc0eb..0d0e799bafb79 100644 --- a/python/ccxt/async_support/base/throttler.py +++ b/python/ccxt/async_support/base/throttler.py @@ -13,6 +13,7 @@ def __init__(self, config, loop=None): 'tokens': 0, 'maxCapacity': 2000, 'capacity': 1.0, + # maxWeight should be set in config parameter for rolling window algorithm } self.config.update(config) self.queue = collections.deque() diff --git a/python/ccxt/base/exchange.py b/python/ccxt/base/exchange.py index 468f0e1518a82..7b057335fdb68 100644 --- a/python/ccxt/base/exchange.py +++ b/python/ccxt/base/exchange.py @@ -167,6 +167,11 @@ class Exchange(object): timeframes = {} tokenBucket = None + # rate limiter properties + rateLimiterAlgorithm = 'leakyBucket' # rollingWindow or leakyBucket + maxLimiterRequests = 1000 + rollingWindowSize = 60000 + fees = { 'trading': { 'tierBased': None, @@ -2863,8 +2868,11 @@ def init_rest_rate_limiter(self): 'delay': 0.001, 'capacity': 1, 'cost': 1, - 'maxCapacity': 1000, + 'maxLimiterRequests': self.maxLimiterRequests, 'refillRate': refillRate, + 'algorithm': self.rateLimiterAlgorithm, + 'windowSize': self.rollingWindowSize, + 'maxWeight': self.rollingWindowSize / self.rateLimit, # ms_of_window / ms_of_rate_limit } existingBucket = {} if (self.tokenBucket is None) else self.tokenBucket self.tokenBucket = self.extend(defaultBucket, existingBucket) diff --git a/ts/src/base/Exchange.ts b/ts/src/base/Exchange.ts index 60f62e5585e02..e80329c68395d 100644 --- a/ts/src/base/Exchange.ts +++ b/ts/src/base/Exchange.ts @@ -335,6 +335,11 @@ export default class Exchange { throttler = undefined enableRateLimit: boolean = undefined; + // rate limiter properties + rateLimiterAlgorithm: string = 'leakyBucket'; // rollingWindow or leakyBucket + maxLimiterRequests: Num = 1000; + rollingWindowSize: Num = 60000; + httpExceptions = undefined limits: { @@ -2852,8 +2857,11 @@ export default class Exchange { 'delay': 0.001, 'capacity': 1, 'cost': 1, - 'maxCapacity': 1000, + 'maxLimiterRequests': this.maxLimiterRequests, 'refillRate': refillRate, + 'algorithm': this.rateLimiterAlgorithm, + 'windowSize': this.rollingWindowSize, + 'maxWeight': this.rollingWindowSize / this.rateLimit, // ms_of_window / ms_of_rate_limit }; const existingBucket = (this.tokenBucket === undefined) ? {} : this.tokenBucket; this.tokenBucket = this.extend (defaultBucket, existingBucket); diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index 180a267e83aba..f6135aef2c8c7 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -6,21 +6,42 @@ import { now, sleep } from './time.js'; /* ------------------------------------------------------------------------ */ class Throttler { + + running: boolean; + queue: { resolver: any; cost: number }[]; + config: { + refillRate: number; + delay: number; + capacity: number; + maxLimiterRequests: number; + tokens: number; + cost: number; + algorithm: string; + rateLimit: number; + windowSize: number; + maxWeight: number; + }; + timestamps: { timestamp: number; cost: number }[]; + constructor (config) { this.config = { 'refillRate': 1.0, 'delay': 0.001, 'capacity': 1.0, - 'maxCapacity': 2000, + 'maxLimiterRequests': 2000, 'tokens': 0, 'cost': 1.0, + 'algorithm': 'leakyBucket', + 'windowSize': 60000.0, + // maxWeight should be set in config parameter for rolling window algorithm }; Object.assign (this.config, config); this.queue = []; this.running = false; + this.timestamps = []; } - async loop () { + async leakyBucketLoop () { let lastTimestamp = now (); while (this.running) { const { resolver, cost } = this.queue[0]; @@ -44,13 +65,48 @@ class Throttler { } } + async rollingWindowLoop() { + while (this.running) { + const { resolver, cost } = this.queue[0]; + const nowTime = now (); + this.timestamps = this.timestamps.filter (t => nowTime - t.timestamp < this.config.windowSize); // Remove timestamps outside the rolling window + const totalCost = this.timestamps.reduce ((sum, t) => sum + t.cost, 0); // Calculate the total cost of requests still in the window + if (totalCost + cost <= this.config.maxWeight) { + // Enough capacity, proceed with request + this.timestamps.push ({ timestamp: nowTime, cost }); + resolver (); + this.queue.shift (); + await Promise.resolve (); // Yield control to event loop + if (this.queue.length === 0) { + this.running = false; + } + } else { + // Calculate the wait time until the oldest request expires + const earliestRequestTime = this.timestamps[0].timestamp; + const waitTime = (earliestRequestTime + this.config.windowSize) - nowTime; + // Ensure waitTime is positive before sleeping + if (waitTime > 0) { + await sleep (waitTime); + } + } + } + } + + async loop () { + if (this.config['rateLimiterAlogorithm'] === 'leakyBucket') { + await this.leakyBucketLoop (); + } else { + await this.rollingWindowLoop (); + } + } + throttle (cost = undefined) { let resolver; const promise = new Promise ((resolve, reject) => { resolver = resolve; }); - if (this.queue.length > this.config['maxCapacity']) { - throw new Error ('throttle queue is over maxCapacity (' + this.config['maxCapacity'].toString () + '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526'); + if (this.queue.length > this.config['maxLimiterRequests']) { + throw new Error ('throttle queue is over maxLimiterRequests (' + this.config['maxLimiterRequests'].toString () + '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526'); } cost = (cost === undefined) ? this.config['cost'] : cost; this.queue.push ({ resolver, cost }); diff --git a/ts/src/bitget.ts b/ts/src/bitget.ts index 1a0bcdd1b991f..fb1928de57c4b 100644 --- a/ts/src/bitget.ts +++ b/ts/src/bitget.ts @@ -1550,6 +1550,7 @@ export default class bitget extends Exchange { // fiat currencies on deposit page 'fiatCurrencies': [ 'EUR', 'VND', 'PLN', 'CZK', 'HUF', 'DKK', 'AUD', 'CAD', 'NOK', 'SEK', 'CHF', 'MXN', 'COP', 'ARS', 'GBP', 'BRL', 'UAH', 'ZAR' ], }, + 'rollingWindowSize': 1000, 'features': { 'spot': { 'sandbox': true, From d08ed7a0c1705a2b308ee1ab250061f9ed23c054 Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 2 Jun 2025 03:10:53 -0600 Subject: [PATCH 02/38] rollingWindowRateLimiter for other languages --- cs/ccxt/base/Exchange.BaseMethods.cs | 8 ++- cs/ccxt/base/Exchange.Options.cs | 3 + cs/ccxt/base/Throttler.cs | 64 ++++++++++++++++-- go/v4/exchange.go | 3 + go/v4/exchange_throttler.go | 74 ++++++++++++++++++++- php/Exchange.php | 9 ++- php/async/Throttler.php | 58 ++++++++++++++-- python/ccxt/async_support/base/throttler.py | 41 ++++++++++-- 8 files changed, 241 insertions(+), 19 deletions(-) diff --git a/cs/ccxt/base/Exchange.BaseMethods.cs b/cs/ccxt/base/Exchange.BaseMethods.cs index 91180c92ad3ab..5f7f77f87b1f4 100644 --- a/cs/ccxt/base/Exchange.BaseMethods.cs +++ b/cs/ccxt/base/Exchange.BaseMethods.cs @@ -14,6 +14,9 @@ public virtual object describe() { "countries", null }, { "enableRateLimit", true }, { "rateLimit", 2000 }, + { "rateLimiterAlgorithm", "leakyBucket" }, // rollingWindow or leakyBucket + { "maxLimiterRequests", 1000 }, + { "rollingWindowSize", 60000 }, { "timeout", this.timeout }, { "certified", false }, { "pro", false }, @@ -1439,8 +1442,11 @@ public virtual void initRestRateLimiter() { "delay", 0.001 }, { "capacity", 1 }, { "cost", 1 }, - { "maxCapacity", 1000 }, + { "maxLimiterRequests", this.maxLimiterRequests }, { "refillRate", refillRate }, + { "algorithm", this.rateLimiterAlgorithm }, + { "windowSize", this.rollingWindowSize }, + { "maxWeight", this.rollingWindowSize / this.rateLimit }, // ms_of_window / ms_of_rate_limit }; object existingBucket = ((bool) isTrue((isEqual(this.tokenBucket, null)))) ? new Dictionary() {} : this.tokenBucket; this.tokenBucket = this.extend(defaultBucket, existingBucket); diff --git a/cs/ccxt/base/Exchange.Options.cs b/cs/ccxt/base/Exchange.Options.cs index bce4298ef615c..3bd0e196bef38 100644 --- a/cs/ccxt/base/Exchange.Options.cs +++ b/cs/ccxt/base/Exchange.Options.cs @@ -17,6 +17,9 @@ public partial class Exchange public string userAgent { get; set; } public bool verbose { get; set; } = true; public bool enableRateLimit { get; set; } = true; + public string rateLimiterAlgorithm {get; set; } = "leakyBucket"; // rollingWindow or leakyBucket + public int maxLimiterRequests {get; set; } = 1000; + public int rollingWindowSize {get; set; } = 60000; public long lastRestRequestTimestamp { get; set; } = 0; public string url { get; set; } = ""; diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 32b5ab2ed42ee..12612af8e9064 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -10,6 +10,7 @@ public class Throttler private Queue<(Task, double)> queue = new Queue<(Task, double)>(); private bool running = false; + private List<(long timestamp, double cost)> timestamps = new List<(long, double)>(); public Throttler(dict config) { @@ -19,14 +20,18 @@ public Throttler(dict config) {"delay", 0.001}, {"cost", 1.0}, {"tokens", 0}, - {"maxCapacity", 2000}, + {"maxLimiterRequests", 2000}, {"capacity", 1.0}, + {"algorithm", "leakyBucket"}, + {"rateLimit", 0.0}, + {"windowSize", 60000.0}, + // maxWeight should be set in config parameter for rolling window algorithm }; this.config = extend(this.config, config); } - private async Task loop() + private async Task leakyBucketLoop() { var lastTimestamp = milliseconds(); while (this.running) @@ -73,13 +78,62 @@ private async Task loop() } - public async Task throttle(object cost2) + private async Task rollingWindowLoop() { + while (this.running) + { + if (this.queue.Count == 0) + { + this.running = false; + continue; + } + var first = this.queue.Peek(); + var task = first.Item1; + var cost = first.Item2; + var now = milliseconds(); + timestamps = timestamps.Where(t => now - t.timestamp < Convert.ToDouble(this.config["windowSize"])).ToList(); + var totalCost = timestamps.Sum(t => t.cost); + if (totalCost + cost <= Convert.ToDouble(this.config["maxWeight"])) + { + timestamps.Add((now, cost)); + await Task.Delay(0); + if (task != null && task.Status == TaskStatus.Created) + { + task.Start(); + } + this.queue.Dequeue(); + if (this.queue.Count == 0) this.running = false; + } + else + { + var earliest = timestamps[0].timestamp; + var waitTime = (earliest + Convert.ToDouble(this.config["windowSize"])) - now; + if (waitTime > 0) + { + await Task.Delay((int)waitTime); + } + } + } + } + private async Task loop() + { + if (this.config["algorithm"].ToString() == "leakyBucket") + { + await leakyBucketLoop(); + } + else + { + await rollingWindowLoop(); + } + } + + public async Task throttle(object cost2) + { var cost = (cost2 != null) ? Convert.ToDouble(cost2) : Convert.ToDouble(this.config["cost"]); - if (this.queue.Count > (int)this.config["maxCapacity"]) + if (this.queue.Count > (int)this.config["maxLimiterRequests"]) { - throw new Exception("throttle queue is over maxCapacity (" + this.config["maxCapacity"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); + throw new Exception("throttle queue is over maxLimiterRequests (" + this.config["maxLimiterRequests"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); } var t = new Task(() => { }); this.queue.Enqueue((t, cost)); diff --git a/go/v4/exchange.go b/go/v4/exchange.go index 30a3df5091191..9bf026288a8cf 100644 --- a/go/v4/exchange.go +++ b/go/v4/exchange.go @@ -53,6 +53,9 @@ type Exchange struct { Verbose bool UserAgent string EnableRateLimit bool + RateLimiterAlgorithm string // rollingWindow or leakyBucket + MaxLimiterRequests int + RollingWindowSize int Url string Hostname string BaseCurrencies map[string]interface{} diff --git a/go/v4/exchange_throttler.go b/go/v4/exchange_throttler.go index 8ba5bc942e5a6..453cdea4615e6 100644 --- a/go/v4/exchange_throttler.go +++ b/go/v4/exchange_throttler.go @@ -6,10 +6,16 @@ import ( u "github.com/google/uuid" ) +type TimestampedCost struct { + Timestamp int64 + Cost float64 +} + type Throttler struct { Queue Queue Running bool Config map[string]interface{} + Timestamps []TimestampedCost } func NewThrottler(config map[string]interface{}) Throttler { @@ -17,15 +23,20 @@ func NewThrottler(config map[string]interface{}) Throttler { "refillRate": 1.0, "delay": 0.001, "capacity": 1.0, - "maxCapacity": 2000, + "maxLimiterRequests": 2000, "tokens": 0, "cost": 1.0, + "algorithm": "leakyBucket", + "rateLimit": 0.0, + "windowSize": 60000.0, + "maxWeight": 0.0, } return Throttler{ Queue: NewQueue(), Running: false, Config: ExtendMap(defaultConfig, config), + Timestamps: []TimestampedCost{}, } } @@ -49,13 +60,17 @@ func (t *Throttler) Throttle(cost2 interface{}) <-chan bool { if !t.Running { t.Running = true - go t.Loop() + if t.Config["algorithm"] == "leakyBucket" { + go t.leakyBucketLoop() + } else { + go t.rollingWindowLoop() + } } return task } -func (t *Throttler) Loop() { +func (t *Throttler) leakyBucketLoop() { lastTimestamp := Milliseconds() for t.Running { @@ -95,6 +110,59 @@ func (t *Throttler) Loop() { } +func (t *Throttler) rollingWindowLoop() { + for t.Running { + if t.Queue.IsEmpty() { + t.Running = false + continue + } + first, _ := t.Queue.Peek() + task := first.Task + cost := first.Cost + now := Milliseconds() + windowSize := ToFloat64(t.Config["windowSize"]) + maxWeight := ToFloat64(t.Config["maxWeight"]) + + t.Timestamps = filterTimestamps(t.Timestamps, now, windowSize) + totalCost := sumCosts(t.Timestamps) + + if totalCost+cost <= maxWeight { + t.Timestamps = append(t.Timestamps, TimestampedCost{Timestamp: now, Cost: cost}) + if task != nil { + task <- true + close(task) + } + t.Queue.Dequeue() + if t.Queue.IsEmpty() { + t.Running = false + } + } else { + waitTime := int64((t.Timestamps[0].Timestamp + int64(windowSize)) - now) + if waitTime > 0 { + time.Sleep(time.Duration(waitTime) * time.Millisecond) + } + } + } +} + +func filterTimestamps(timestamps []TimestampedCost, now int64, windowSize float64) []TimestampedCost { + result := []TimestampedCost{} + for _, t := range timestamps { + if float64(now-t.Timestamp) < windowSize { + result = append(result, t) + } + } + return result +} + +func sumCosts(timestamps []TimestampedCost) float64 { + sum := 0.0 + for _, t := range timestamps { + sum += t.Cost + } + return sum +} + func Milliseconds() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } diff --git a/php/Exchange.php b/php/Exchange.php index a778138e1d300..9e0116fd50b58 100644 --- a/php/Exchange.php +++ b/php/Exchange.php @@ -300,6 +300,10 @@ class Exchange { public $restRequestQueue = null; public $restPollerLoopIsRunning = false; public $enableRateLimit = true; + // rate limiter properties + public $rateLimiterAlgorithm = 'leakyBucket'; // rollingWindow or leakyBucket + public $maxLimiterRequests = 1000; + public $rollingWindowSize = 60000; public $enableLastJsonResponse = false; public $enableLastHttpResponse = true; public $enableLastResponseHeaders = true; @@ -3430,8 +3434,11 @@ public function init_rest_rate_limiter() { 'delay' => 0.001, 'capacity' => 1, 'cost' => 1, - 'maxCapacity' => 1000, + 'maxLimiterRequests' => 1000, 'refillRate' => $refillRate, + 'algorithm' => $this->rateLimiterAlgorithm, + 'windowSize' => $this->rollingWindowSize, + 'maxWeight' => $this->rollingWindowSize / $this->rateLimit, // ms_of_window / ms_of_rate_limit ); $existingBucket = ($this->tokenBucket === null) ? array() : $this->tokenBucket; $this->tokenBucket = $this->extend($defaultBucket, $existingBucket); diff --git a/php/async/Throttler.php b/php/async/Throttler.php index cafdf8d8b018a..a0f012fdd064c 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -11,6 +11,7 @@ class Throttler { public $config; public $queue; public $running; + public $timestamps; public function __construct($config) { $this->config = array_merge(array( @@ -18,14 +19,19 @@ public function __construct($config) { 'delay' => 0.001, 'cost' => 1.0, 'tokens' => 0.0, - 'maxCapacity' => 2000, + 'maxLimiterRequests' => 2000, 'capacity' => 1.0, + 'algorithm' => 'leakyBucket', + 'rateLimit' => 0, + 'windowSize' => 60000.0, + 'maxWeight' => 0 ), $config); $this->queue = new \SplQueue(); $this->running = false; + $this->timestamps = array(); } - public function loop() { + public function leakyBucketLoop() { return Async\async(function () { $last_timestamp = microtime(true) * 1000.0; while ($this->running) { @@ -57,11 +63,55 @@ public function loop() { }) (); } + public function rollingWindowLoop() { + return Async\async(function () { + while ($this->running) { + list($future, $cost) = $this->queue->bottom(); + $cost = $cost ? $cost : $this->config['cost']; + $now = microtime(true) * 1000.0; + $this->timestamps = array_values(array_filter($this->timestamps, function ($entry) use ($now) { + return ($now - $entry['timestamp']) < $this->config['windowSize']; + })); + $total_cost = array_reduce($this->timestamps, function ($sum, $entry) { + return $sum + $entry['cost']; + }, 0); + if ($total_cost + $cost <= $this->config['maxWeight']) { + $this->timestamps[] = array('timestamp' => $now, 'cost' => $cost); + $future->resolve(null); + $this->queue->dequeue(); + # context switch? + # yield 0; + if ($this->queue->count() === 0) { + $this->running = false; + } + } else { + $earliest = $this->timestamps[0]['timestamp']; + $wait_time = ($earliest + $this->config['windowSize']) - $now; + if ($wait_time > 0) { + $sleep = new Promise(function ($resolve) use ($wait_time) { + Loop::addTimer($wait_time / 1000, function () use ($resolve) { + $resolve(null); + }); + }); + Async\await($sleep); + } + } + } + }) (); + } + + public function loop() { + if ($this->config['algorithm'] === 'leakyBucket') { + return $this->leakyBucketLoop(); + } else { + return $this->rollingWindowLoop(); + } + } public function __invoke($cost = null) { $future = new Deferred(); - if ($this->queue->count() > $this->config['maxCapacity']) { - throw new \RuntimeException('throttle queue is over maxCapacity (' . strval($this->config['maxCapacity']) . '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526'); + if ($this->queue->count() > $this->config['maxLimiterRequests']) { + throw new \RuntimeException('throttle queue is over maxLimiterRequests (' . strval($this->config['maxLimiterRequests']) . '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526'); } $this->queue->enqueue(array($future, $cost)); if (!$this->running) { diff --git a/python/ccxt/async_support/base/throttler.py b/python/ccxt/async_support/base/throttler.py index 0d0e799bafb79..666f17ff8cf3d 100644 --- a/python/ccxt/async_support/base/throttler.py +++ b/python/ccxt/async_support/base/throttler.py @@ -11,15 +11,19 @@ def __init__(self, config, loop=None): 'delay': 0.001, 'cost': 1.0, 'tokens': 0, - 'maxCapacity': 2000, + 'maxLimiterRequests': 2000, 'capacity': 1.0, - # maxWeight should be set in config parameter for rolling window algorithm + 'algorithm': 'leakyBucket', + 'rateLimit': 0, + 'windowSize': 60000.0, + 'maxWeight': 0 } self.config.update(config) self.queue = collections.deque() self.running = False + self.timestamps = [] - async def looper(self): + async def leaky_bucket_loop(self): last_timestamp = time() * 1000 while self.running: future, cost = self.queue[0] @@ -40,10 +44,37 @@ async def looper(self): last_timestamp = now self.config['tokens'] = min(self.config['tokens'] + elapsed * self.config['refillRate'], self.config['capacity']) + async def rolling_window_loop(self): + while self.running: + future, cost = self.queue[0] + cost = self.config['cost'] if cost is None else cost + now = time() * 1000 + self.timestamps = [t for t in self.timestamps if now - t['timestamp'] < self.config['windowSize']] + total_cost = sum(t['cost'] for t in self.timestamps) + if total_cost + cost <= self.config['maxWeight']: + self.timestamps.append({'timestamp': now, 'cost': cost}) + if not future.done(): + future.set_result(None) + self.queue.popleft() + # context switch + await asyncio.sleep(0) + if not self.queue: + self.running = False + else: + wait_time = (self.timestamps[0]['timestamp'] + self.config['windowSize']) - now + if wait_time > 0: + await asyncio.sleep(wait_time / 1000) + + async def looper(self): + if self.config.get('rateLimiterAlogorithm', 'leakyBucket') == 'leakyBucket': + await self.leaky_bucket_loop() + else: + await self.rolling_window_loop() + def __call__(self, cost=None): future = asyncio.Future() - if len(self.queue) > self.config['maxCapacity']: - raise RuntimeError('throttle queue is over maxCapacity (' + str(int(self.config['maxCapacity'])) + '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526') + if len(self.queue) > self.config['maxLimiterRequests']: + raise RuntimeError('throttle queue is over maxLimiterRequests (' + str(int(self.config['maxLimiterRequests'])) + '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526') self.queue.append((future, cost)) if not self.running: self.running = True From c21347f1853b64445392c45520b7bab07c07106b Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 9 Jun 2025 23:21:20 -0600 Subject: [PATCH 03/38] C# rate limiter fix made the queue thread safe --- cs/ccxt/base/Exchange.Options.cs | 3 ++ cs/ccxt/base/Throttler.cs | 74 +++++++++++++++++++++----------- ts/src/base/Exchange.ts | 3 ++ 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/cs/ccxt/base/Exchange.Options.cs b/cs/ccxt/base/Exchange.Options.cs index 3bd0e196bef38..5099aca453223 100644 --- a/cs/ccxt/base/Exchange.Options.cs +++ b/cs/ccxt/base/Exchange.Options.cs @@ -293,5 +293,8 @@ void initializeProperties(dict userConfig = null) this.newUpdates = SafeValue(extendedProperties, "newUpdates") as bool? ?? true; this.accounts = SafeValue(extendedProperties, "accounts") as List; this.features = SafeValue(extendedProperties, "features", features) as dict; + this.rateLimiterAlgorithm = SafeString(extendedProperties, "rateLimiterAlgorithm", "leakyBucket"); + this.maxLimiterRequests = (int)(SafeInteger(extendedProperties, "maxLimiterRequests") ?? 1000); + this.rollingWindowSize = (int)(SafeInteger(extendedProperties, "rollingWindowSize") ?? 60000); } } diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 12612af8e9064..c8f4e03d9b1a7 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -8,6 +8,7 @@ public class Throttler private dict config = new dict(); private Queue<(Task, double)> queue = new Queue<(Task, double)>(); + private readonly object queueLock = new object(); private bool running = false; private List<(long timestamp, double cost)> timestamps = new List<(long, double)>(); @@ -37,12 +38,19 @@ private async Task leakyBucketLoop() while (this.running) { // do we need this check here? - if (this.queue.Count == 0) + lock (queueLock) { - this.running = false; - continue; + if (this.queue.Count == 0) + { + this.running = false; + continue; + } + } + (Task, double) first; + lock (queueLock) + { + first = this.queue.Peek(); } - var first = this.queue.Peek(); var task = first.Item1; var cost = first.Item2; var tokensAsString = Convert.ToString(this.config["tokens"], CultureInfo.InvariantCulture); @@ -58,11 +66,14 @@ private async Task leakyBucketLoop() task.Start(); } } - this.queue.Dequeue(); - - if (this.queue.Count == 0) + lock (queueLock) { - this.running = false; + this.queue.Dequeue(); + + if (this.queue.Count == 0) + { + this.running = false; + } } } else @@ -82,12 +93,19 @@ private async Task rollingWindowLoop() { while (this.running) { - if (this.queue.Count == 0) + lock (queueLock) + { + if (this.queue.Count == 0) + { + this.running = false; + continue; + } + } + (Task, double) first; + lock (queueLock) { - this.running = false; - continue; + first = this.queue.Peek(); } - var first = this.queue.Peek(); var task = first.Item1; var cost = first.Item2; var now = milliseconds(); @@ -101,8 +119,11 @@ private async Task rollingWindowLoop() { task.Start(); } - this.queue.Dequeue(); - if (this.queue.Count == 0) this.running = false; + lock (queueLock) + { + this.queue.Dequeue(); + if (this.queue.Count == 0) this.running = false; + } } else { @@ -131,19 +152,22 @@ private async Task loop() public async Task throttle(object cost2) { var cost = (cost2 != null) ? Convert.ToDouble(cost2) : Convert.ToDouble(this.config["cost"]); - if (this.queue.Count > (int)this.config["maxLimiterRequests"]) + lock (queueLock) { - throw new Exception("throttle queue is over maxLimiterRequests (" + this.config["maxLimiterRequests"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); - } - var t = new Task(() => { }); - this.queue.Enqueue((t, cost)); - if (!this.running) - { - this.running = true; - // Task.Run(() => { this.loop(); }); - this.loop(); + if (this.queue.Count > (int)this.config["maxLimiterRequests"]) + { + throw new Exception("throttle queue is over maxLimiterRequests (" + this.config["maxLimiterRequests"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); + } + var t = new Task(() => { }); + this.queue.Enqueue((t, cost)); + if (!this.running) + { + this.running = true; + // Task.Run(() => { this.loop(); }); + this.loop(); + } + return t; } - return t; } // move this elsewhere later diff --git a/ts/src/base/Exchange.ts b/ts/src/base/Exchange.ts index e80329c68395d..a487dbd7111b0 100644 --- a/ts/src/base/Exchange.ts +++ b/ts/src/base/Exchange.ts @@ -1690,6 +1690,9 @@ export default class Exchange { 'countries': undefined, 'enableRateLimit': true, 'rateLimit': 2000, // milliseconds = seconds * 1000 + 'rateLimiterAlgorithm': this.rateLimiterAlgorithm, + 'rollingWindowSize': this.rollingWindowSize, + 'maxLimiterRequests': this.maxLimiterRequests, 'timeout': this.timeout, // milliseconds = seconds * 1000 'certified': false, // if certified by the CCXT dev team 'pro': false, // if it is integrated with CCXT Pro for WebSocket support From eb26af706f244bb325b0842761916619438cd03e Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Tue, 10 Jun 2025 00:35:26 -0600 Subject: [PATCH 04/38] python rate limiter fix --- python/ccxt/async_support/base/throttler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ccxt/async_support/base/throttler.py b/python/ccxt/async_support/base/throttler.py index 666f17ff8cf3d..ff6eaf1e184de 100644 --- a/python/ccxt/async_support/base/throttler.py +++ b/python/ccxt/async_support/base/throttler.py @@ -66,7 +66,7 @@ async def rolling_window_loop(self): await asyncio.sleep(wait_time / 1000) async def looper(self): - if self.config.get('rateLimiterAlogorithm', 'leakyBucket') == 'leakyBucket': + if self.config['algorithm'] == 'leakyBucket': await self.leaky_bucket_loop() else: await self.rolling_window_loop() From 2f29eaf459954f22ad6ea9f158ca88a55700dfdf Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Tue, 10 Jun 2025 02:32:14 -0600 Subject: [PATCH 05/38] rollingWindowLoop fixes --- ts/src/base/functions/throttle.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index f6135aef2c8c7..439a7b2083fe5 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -93,7 +93,7 @@ class Throttler { } async loop () { - if (this.config['rateLimiterAlogorithm'] === 'leakyBucket') { + if (this.config['algorithm'] === 'leakyBucket') { await this.leakyBucketLoop (); } else { await this.rollingWindowLoop (); From 55e493db38c4e0be575f6897c25358c8bea8c983 Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Tue, 10 Jun 2025 16:38:52 -0600 Subject: [PATCH 06/38] ratelimiter go fixes --- go/v4/exchange_options.go | 3 +++ go/v4/exchange_throttler.go | 29 ++++++++++++++--------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/go/v4/exchange_options.go b/go/v4/exchange_options.go index 55d7a303ae578..fef6f68e6b855 100644 --- a/go/v4/exchange_options.go +++ b/go/v4/exchange_options.go @@ -377,6 +377,9 @@ func (this *Exchange) initializeProperties(extendedProperties map[string]interfa } this.EnableRateLimit = SafeValue(extendedProperties, "enableRateLimit", true).(bool) this.RateLimit = SafeFloat(extendedProperties, "rateLimit", -1).(float64) + this.RateLimiterAlgorithm = SafeString(extendedProperties, "rateLimiterAlgorithm", "leakyBucket").(string) + this.MaxLimiterRequests = int(SafeInteger(extendedProperties, "maxLimiterRequests", 1000).(int64)) + this.RollingWindowSize = int(SafeInteger(extendedProperties, "rollingWindowSize", 60000).(int64)) // this.status = SafeValue(extendedProperties, "status",map[string]interface{}{}).(map[string]interface{}) this.PrecisionMode = int(SafeInteger(extendedProperties, "precisionMode", this.PrecisionMode).(int64)) this.PaddingMode = int(SafeInteger(extendedProperties, "paddingMode", this.PaddingMode).(int64)) diff --git a/go/v4/exchange_throttler.go b/go/v4/exchange_throttler.go index 453cdea4615e6..fc2619138a199 100644 --- a/go/v4/exchange_throttler.go +++ b/go/v4/exchange_throttler.go @@ -12,30 +12,29 @@ type TimestampedCost struct { } type Throttler struct { - Queue Queue - Running bool - Config map[string]interface{} + Queue Queue + Running bool + Config map[string]interface{} Timestamps []TimestampedCost } func NewThrottler(config map[string]interface{}) Throttler { defaultConfig := map[string]interface{}{ - "refillRate": 1.0, - "delay": 0.001, - "capacity": 1.0, + "refillRate": 1.0, + "delay": 0.001, + "capacity": 1.0, "maxLimiterRequests": 2000, - "tokens": 0, - "cost": 1.0, - "algorithm": "leakyBucket", - "rateLimit": 0.0, - "windowSize": 60000.0, - "maxWeight": 0.0, + "tokens": 0, + "cost": 1.0, + "algorithm": "leakyBucket", + "rateLimit": 0.0, + "windowSize": 60000.0, } return Throttler{ - Queue: NewQueue(), - Running: false, - Config: ExtendMap(defaultConfig, config), + Queue: NewQueue(), + Running: false, + Config: ExtendMap(defaultConfig, config), Timestamps: []TimestampedCost{}, } } From e0a6de7c04d144f3ee8ad5e446edff7ad0b8a6ab Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Tue, 10 Jun 2025 17:19:23 -0600 Subject: [PATCH 07/38] go throttler is lock safe --- go/v4/exchange_throttler.go | 51 +++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/go/v4/exchange_throttler.go b/go/v4/exchange_throttler.go index fc2619138a199..1889298388a97 100644 --- a/go/v4/exchange_throttler.go +++ b/go/v4/exchange_throttler.go @@ -1,6 +1,7 @@ package ccxt import ( + "sync" "time" u "github.com/google/uuid" @@ -16,6 +17,7 @@ type Throttler struct { Running bool Config map[string]interface{} Timestamps []TimestampedCost + Mutex sync.Mutex } func NewThrottler(config map[string]interface{}) Throttler { @@ -45,7 +47,9 @@ func (t *Throttler) Throttle(cost2 interface{}) <-chan bool { if cost2 != nil { cost = cost2.(float64) } else { + t.Mutex.Lock() cost = ToFloat64(t.Config["cost"]) + t.Mutex.Unlock() } task := make(chan bool) @@ -57,13 +61,19 @@ func (t *Throttler) Throttle(cost2 interface{}) <-chan bool { t.Queue.Enqueue(queueElement) + t.Mutex.Lock() if !t.Running { t.Running = true - if t.Config["algorithm"] == "leakyBucket" { + algorithm := t.Config["algorithm"] + t.Mutex.Unlock() + + if algorithm == "leakyBucket" { go t.leakyBucketLoop() } else { go t.rollingWindowLoop() } + } else { + t.Mutex.Unlock() } return task @@ -72,71 +82,80 @@ func (t *Throttler) Throttle(cost2 interface{}) <-chan bool { func (t *Throttler) leakyBucketLoop() { lastTimestamp := Milliseconds() - for t.Running { + for { if t.Queue.IsEmpty() { + t.Mutex.Lock() t.Running = false - continue + t.Mutex.Unlock() + return } + first, _ := t.Queue.Peek() task := first.Task cost := first.Cost + t.Mutex.Lock() tokens := ToFloat64(t.Config["tokens"]) - if tokens >= 0 { t.Config["tokens"] = tokens - cost + t.Mutex.Unlock() if task != nil { task <- true close(task) } t.Queue.Dequeue() - - if t.Queue.IsEmpty() { - t.Running = false - } } else { sleepTime := ToFloat64(t.Config["delay"]) * 1000 + t.Mutex.Unlock() time.Sleep(time.Duration(sleepTime) * time.Millisecond) + current := Milliseconds() elapsed := current - lastTimestamp lastTimestamp = current + + t.Mutex.Lock() sumTokens := ToFloat64(t.Config["refillRate"]) * ToFloat64(elapsed) - tokens := ToFloat64(t.Config["tokens"]) + sumTokens + tokens = ToFloat64(t.Config["tokens"]) + sumTokens t.Config["tokens"] = MathMin(tokens, ToFloat64(t.Config["capacity"])) + t.Mutex.Unlock() } } - } func (t *Throttler) rollingWindowLoop() { - for t.Running { + for { if t.Queue.IsEmpty() { + t.Mutex.Lock() t.Running = false - continue + t.Mutex.Unlock() + return } + first, _ := t.Queue.Peek() task := first.Task cost := first.Cost now := Milliseconds() + + t.Mutex.Lock() windowSize := ToFloat64(t.Config["windowSize"]) maxWeight := ToFloat64(t.Config["maxWeight"]) - t.Timestamps = filterTimestamps(t.Timestamps, now, windowSize) totalCost := sumCosts(t.Timestamps) if totalCost+cost <= maxWeight { t.Timestamps = append(t.Timestamps, TimestampedCost{Timestamp: now, Cost: cost}) + t.Mutex.Unlock() + if task != nil { task <- true close(task) } t.Queue.Dequeue() - if t.Queue.IsEmpty() { - t.Running = false - } } else { waitTime := int64((t.Timestamps[0].Timestamp + int64(windowSize)) - now) + t.Mutex.Unlock() + if waitTime > 0 { time.Sleep(time.Duration(waitTime) * time.Millisecond) } From aad1d091ae31bd90ca794c88b79d4122827f4ad0 Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Tue, 10 Jun 2025 20:29:35 -0600 Subject: [PATCH 08/38] go rate limiter fix --- go/v4/exchange_throttler.go | 28 ++++++++++++++-------------- ts/src/base/Exchange.ts | 3 --- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/go/v4/exchange_throttler.go b/go/v4/exchange_throttler.go index 1889298388a97..1ea2de78906b4 100644 --- a/go/v4/exchange_throttler.go +++ b/go/v4/exchange_throttler.go @@ -13,30 +13,30 @@ type TimestampedCost struct { } type Throttler struct { - Queue Queue - Running bool - Config map[string]interface{} + Queue Queue + Running bool + Config map[string]interface{} Timestamps []TimestampedCost Mutex sync.Mutex } func NewThrottler(config map[string]interface{}) Throttler { defaultConfig := map[string]interface{}{ - "refillRate": 1.0, - "delay": 0.001, - "capacity": 1.0, + "refillRate": 1.0, + "delay": 0.001, + "capacity": 1.0, "maxLimiterRequests": 2000, - "tokens": 0, - "cost": 1.0, - "algorithm": "leakyBucket", - "rateLimit": 0.0, - "windowSize": 60000.0, + "tokens": 0, + "cost": 1.0, + "algorithm": "leakyBucket", + "rateLimit": 0.0, + "windowSize": 60000.0, } return Throttler{ - Queue: NewQueue(), - Running: false, - Config: ExtendMap(defaultConfig, config), + Queue: NewQueue(), + Running: false, + Config: ExtendMap(defaultConfig, config), Timestamps: []TimestampedCost{}, } } diff --git a/ts/src/base/Exchange.ts b/ts/src/base/Exchange.ts index a487dbd7111b0..e80329c68395d 100644 --- a/ts/src/base/Exchange.ts +++ b/ts/src/base/Exchange.ts @@ -1690,9 +1690,6 @@ export default class Exchange { 'countries': undefined, 'enableRateLimit': true, 'rateLimit': 2000, // milliseconds = seconds * 1000 - 'rateLimiterAlgorithm': this.rateLimiterAlgorithm, - 'rollingWindowSize': this.rollingWindowSize, - 'maxLimiterRequests': this.maxLimiterRequests, 'timeout': this.timeout, // milliseconds = seconds * 1000 'certified': false, // if certified by the CCXT dev team 'pro': false, // if it is integrated with CCXT Pro for WebSocket support From 0d554a1ccc4d2965893d8eead1546c71a3801574 Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:20:03 +0400 Subject: [PATCH 09/38] Update cs/ccxt/base/Exchange.BaseMethods.cs --- cs/ccxt/base/Exchange.BaseMethods.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/cs/ccxt/base/Exchange.BaseMethods.cs b/cs/ccxt/base/Exchange.BaseMethods.cs index 5178458986938..fa709a5ce8edc 100644 --- a/cs/ccxt/base/Exchange.BaseMethods.cs +++ b/cs/ccxt/base/Exchange.BaseMethods.cs @@ -1444,9 +1444,6 @@ public virtual void initRestRateLimiter() { "cost", 1 }, { "maxLimiterRequests", this.maxLimiterRequests }, { "refillRate", refillRate }, - { "algorithm", this.rateLimiterAlgorithm }, - { "windowSize", this.rollingWindowSize }, - { "maxWeight", this.rollingWindowSize / this.rateLimit }, // ms_of_window / ms_of_rate_limit }; object existingBucket = ((bool) isTrue((isEqual(this.tokenBucket, null)))) ? new Dictionary() {} : this.tokenBucket; this.tokenBucket = this.extend(defaultBucket, existingBucket); From ef9cf93a74107c33f84a9456e41b9700b5825973 Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:20:12 +0400 Subject: [PATCH 10/38] Update cs/ccxt/base/Exchange.BaseMethods.cs --- cs/ccxt/base/Exchange.BaseMethods.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/cs/ccxt/base/Exchange.BaseMethods.cs b/cs/ccxt/base/Exchange.BaseMethods.cs index fa709a5ce8edc..2b90a82b6304c 100644 --- a/cs/ccxt/base/Exchange.BaseMethods.cs +++ b/cs/ccxt/base/Exchange.BaseMethods.cs @@ -1442,7 +1442,6 @@ public virtual void initRestRateLimiter() { "delay", 0.001 }, { "capacity", 1 }, { "cost", 1 }, - { "maxLimiterRequests", this.maxLimiterRequests }, { "refillRate", refillRate }, }; object existingBucket = ((bool) isTrue((isEqual(this.tokenBucket, null)))) ? new Dictionary() {} : this.tokenBucket; From b760ac68f370dc824c8a0776befdf5a17f84d8df Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:20:32 +0400 Subject: [PATCH 11/38] Update cs/ccxt/base/Exchange.BaseMethods.cs --- cs/ccxt/base/Exchange.BaseMethods.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/cs/ccxt/base/Exchange.BaseMethods.cs b/cs/ccxt/base/Exchange.BaseMethods.cs index 2b90a82b6304c..7b6414b8702e9 100644 --- a/cs/ccxt/base/Exchange.BaseMethods.cs +++ b/cs/ccxt/base/Exchange.BaseMethods.cs @@ -14,9 +14,6 @@ public virtual object describe() { "countries", null }, { "enableRateLimit", true }, { "rateLimit", 2000 }, - { "rateLimiterAlgorithm", "leakyBucket" }, // rollingWindow or leakyBucket - { "maxLimiterRequests", 1000 }, - { "rollingWindowSize", 60000 }, { "timeout", this.timeout }, { "certified", false }, { "pro", false }, From 6f70fa80699686f51390a845d8eb57dbff72351e Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:21:02 +0400 Subject: [PATCH 12/38] Update cs/ccxt/base/Exchange.BaseMethods.cs --- cs/ccxt/base/Exchange.BaseMethods.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/cs/ccxt/base/Exchange.BaseMethods.cs b/cs/ccxt/base/Exchange.BaseMethods.cs index 7b6414b8702e9..20318258fef5d 100644 --- a/cs/ccxt/base/Exchange.BaseMethods.cs +++ b/cs/ccxt/base/Exchange.BaseMethods.cs @@ -1439,6 +1439,7 @@ public virtual void initRestRateLimiter() { "delay", 0.001 }, { "capacity", 1 }, { "cost", 1 }, + { "maxCapacity", 1000 }, { "refillRate", refillRate }, }; object existingBucket = ((bool) isTrue((isEqual(this.tokenBucket, null)))) ? new Dictionary() {} : this.tokenBucket; From 70ae641ad190b5ffce4c0e374682e5efe4d5e46e Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:30:16 +0400 Subject: [PATCH 13/38] Update cs/ccxt/base/Throttler.cs --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index c8f4e03d9b1a7..c527d47dcdb6e 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -139,7 +139,7 @@ private async Task rollingWindowLoop() private async Task loop() { - if (this.config["algorithm"].ToString() == "leakyBucket") + if (this.config["algorithm"] as string == "leakyBucket") { await leakyBucketLoop(); } From 26f2c6c6a2f572d8244e63269e2dda5fe6448e23 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 00:12:58 +0400 Subject: [PATCH 14/38] move out of lock --- cs/ccxt/base/Throttler.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index c8f4e03d9b1a7..81b092f1faa33 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -152,22 +152,22 @@ private async Task loop() public async Task throttle(object cost2) { var cost = (cost2 != null) ? Convert.ToDouble(cost2) : Convert.ToDouble(this.config["cost"]); + var t = new Task(() => { }); lock (queueLock) { if (this.queue.Count > (int)this.config["maxLimiterRequests"]) { throw new Exception("throttle queue is over maxLimiterRequests (" + this.config["maxLimiterRequests"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); } - var t = new Task(() => { }); this.queue.Enqueue((t, cost)); - if (!this.running) - { - this.running = true; - // Task.Run(() => { this.loop(); }); - this.loop(); - } - return t; } + if (!this.running) + { + this.running = true; + // Task.Run(() => { this.loop(); }); + this.loop(); + } + return t; } // move this elsewhere later From 87a62643b5e0f4b57592dc956a302b203d0c818d Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 00:23:12 +0400 Subject: [PATCH 15/38] tokens > cost --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index f5b7080c1aaae..3614c1657c6f3 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -55,7 +55,7 @@ private async Task leakyBucketLoop() var cost = first.Item2; var tokensAsString = Convert.ToString(this.config["tokens"], CultureInfo.InvariantCulture); var floatTokens = double.Parse(tokensAsString, CultureInfo.InvariantCulture); - if (floatTokens >= 0) + if (floatTokens >= cost) { this.config["tokens"] = floatTokens - cost; await Task.Delay(0); From 833b96eb7cf91c6485c98f55d2e5248ca323a87d Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 00:39:43 +0400 Subject: [PATCH 16/38] updates --- cs/ccxt/base/Throttler.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 3614c1657c6f3..c0e2460c8d955 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -20,7 +20,7 @@ public Throttler(dict config) {"refillRate",1.0}, {"delay", 0.001}, {"cost", 1.0}, - {"tokens", 0}, + {"tokens", 0.0}, {"maxLimiterRequests", 2000}, {"capacity", 1.0}, {"algorithm", "leakyBucket"}, @@ -53,8 +53,7 @@ private async Task leakyBucketLoop() } var task = first.Item1; var cost = first.Item2; - var tokensAsString = Convert.ToString(this.config["tokens"], CultureInfo.InvariantCulture); - var floatTokens = double.Parse(tokensAsString, CultureInfo.InvariantCulture); + var floatTokens = Convert.ToDouble(this.config["tokens"]); if (floatTokens >= cost) { this.config["tokens"] = floatTokens - cost; @@ -83,7 +82,7 @@ private async Task leakyBucketLoop() var elapsed = current - lastTimestamp; lastTimestamp = current; var tokens = (double)this.config["tokens"] + ((double)this.config["refillRate"] * elapsed); - this.config["tokens"] = Math.Min(tokens, (int)this.config["capacity"]); + this.config["tokens"] = Math.Min(tokens, (double)this.config["capacity"]); } } @@ -127,6 +126,10 @@ private async Task rollingWindowLoop() } else { + if (timestamps.Count <= 0) + { + continue; + } var earliest = timestamps[0].timestamp; var waitTime = (earliest + Convert.ToDouble(this.config["windowSize"])) - now; if (waitTime > 0) From 718f6652287488bc3aed366bd343615ab4916f26 Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 10:27:47 +0400 Subject: [PATCH 17/38] Update cs/ccxt/base/Throttler.cs --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index c0e2460c8d955..464463025e0e9 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -54,7 +54,7 @@ private async Task leakyBucketLoop() var task = first.Item1; var cost = first.Item2; var floatTokens = Convert.ToDouble(this.config["tokens"]); - if (floatTokens >= cost) + if (floatTokens >= 0) { this.config["tokens"] = floatTokens - cost; await Task.Delay(0); From 28e1081897f2755a0c97f89fa1e5570b4765f182 Mon Sep 17 00:00:00 2001 From: "T. Todua" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 11:37:41 +0400 Subject: [PATCH 18/38] Update cs/ccxt/base/Throttler.cs --- cs/ccxt/base/Throttler.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 464463025e0e9..2b9a150d9d48c 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -108,7 +108,8 @@ private async Task rollingWindowLoop() var task = first.Item1; var cost = first.Item2; var now = milliseconds(); - timestamps = timestamps.Where(t => now - t.timestamp < Convert.ToDouble(this.config["windowSize"])).ToList(); + var windowSize = Convert.ToDouble(this.config["windowSize"]); + timestamps = timestamps.Where(t => now - t.timestamp < windowSize).ToList(); var totalCost = timestamps.Sum(t => t.cost); if (totalCost + cost <= Convert.ToDouble(this.config["maxWeight"])) { From 3f108d07ba2d311da48b669e938421c92826d81f Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 11:38:59 +0400 Subject: [PATCH 19/38] var change --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 2b9a150d9d48c..59bd1390472f5 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -132,7 +132,7 @@ private async Task rollingWindowLoop() continue; } var earliest = timestamps[0].timestamp; - var waitTime = (earliest + Convert.ToDouble(this.config["windowSize"])) - now; + var waitTime = (earliest + windowSize) - now; if (waitTime > 0) { await Task.Delay((int)waitTime); From e7be89c3f545262a34a67449a832e9399638b3f9 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Sat, 14 Jun 2025 11:53:09 +0400 Subject: [PATCH 20/38] rev go formatting --- go/v4/exchange.go | 119 +++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 59 deletions(-) diff --git a/go/v4/exchange.go b/go/v4/exchange.go index ce8810eb03619..1e1edf7ad67c0 100644 --- a/go/v4/exchange.go +++ b/go/v4/exchange.go @@ -18,64 +18,64 @@ import ( ) type Exchange struct { - marketsMutex sync.Mutex - cachedCurrenciesMutex sync.Mutex - loadMu sync.Mutex - marketsLoading bool - marketsLoaded bool - loadMarketsSubscribers []chan interface{} - Itf interface{} - DerivedExchange IDerivedExchange - methodCache sync.Map - cacheLoaded bool - Version string - Id string - Name string - Options map[string]interface{} - Has map[string]interface{} - Api map[string]interface{} - TransformedApi map[string]interface{} - Markets map[string]interface{} - Markets_by_id *sync.Map - Currencies_by_id *sync.Map - Currencies map[string]interface{} - RequiredCredentials map[string]interface{} - HttpExceptions map[string]interface{} - MarketsById map[string]interface{} - Timeframes map[string]interface{} - Features map[string]interface{} - Exceptions map[string]interface{} - Precision map[string]interface{} - Urls interface{} - UserAgents map[string]interface{} - Timeout int64 - MAX_VALUE float64 - RateLimit float64 - TokenBucket map[string]interface{} - Throttler Throttler - NewUpdates bool - Alias bool - Verbose bool - UserAgent string - EnableRateLimit bool - RateLimiterAlgorithm string // rollingWindow or leakyBucket - MaxLimiterRequests int - RollingWindowSize int - Url string - Hostname string - BaseCurrencies map[string]interface{} - QuoteCurrencies map[string]interface{} - ReloadingMarkets bool - MarketsLoading bool - Symbols []string - Codes []string - Ids []string - CommonCurrencies map[string]interface{} - PrecisionMode int - Limits map[string]interface{} - Fees map[string]interface{} - CurrenciesById map[string]interface{} - ReduceFees bool + marketsMutex sync.Mutex + cachedCurrenciesMutex sync.Mutex + loadMu sync.Mutex + marketsLoading bool + marketsLoaded bool + loadMarketsSubscribers []chan interface{} + Itf interface{} + DerivedExchange IDerivedExchange + methodCache sync.Map + cacheLoaded bool + Version string + Id string + Name string + Options map[string]interface{} + Has map[string]interface{} + Api map[string]interface{} + TransformedApi map[string]interface{} + Markets map[string]interface{} + Markets_by_id *sync.Map + Currencies_by_id *sync.Map + Currencies map[string]interface{} + RequiredCredentials map[string]interface{} + HttpExceptions map[string]interface{} + MarketsById map[string]interface{} + Timeframes map[string]interface{} + Features map[string]interface{} + Exceptions map[string]interface{} + Precision map[string]interface{} + Urls interface{} + UserAgents map[string]interface{} + Timeout int64 + MAX_VALUE float64 + RateLimit float64 + RateLimiterAlgorithm string // rollingWindow or leakyBucket + MaxLimiterRequests int + RollingWindowSize int + TokenBucket map[string]interface{} + Throttler Throttler + NewUpdates bool + Alias bool + Verbose bool + UserAgent string + EnableRateLimit bool + Url string + Hostname string + BaseCurrencies map[string]interface{} + QuoteCurrencies map[string]interface{} + ReloadingMarkets bool + MarketsLoading bool + Symbols []string + Codes []string + Ids []string + CommonCurrencies map[string]interface{} + PrecisionMode int + Limits map[string]interface{} + Fees map[string]interface{} + CurrenciesById map[string]interface{} + ReduceFees bool AccountsById interface{} Accounts interface{} @@ -280,7 +280,7 @@ func (this *Exchange) LoadMarkets(params ...interface{}) <-chan interface{} { if !this.marketsLoading || reload { this.marketsLoading = true - markets := <-this.LoadMarketsHelper(params...) + markets := <- this.LoadMarketsHelper(params...) this.marketsLoaded = true this.marketsLoading = false for _, ch := range this.loadMarketsSubscribers { @@ -295,6 +295,7 @@ func (this *Exchange) LoadMarkets(params ...interface{}) <-chan interface{} { } + func (this *Exchange) LoadMarketsHelper(params ...interface{}) <-chan interface{} { ch := make(chan interface{}) From 7dcc605952b7dbea68d111ec3931135bac7af00a Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 06:03:26 -0600 Subject: [PATCH 21/38] Update ts/src/base/functions/throttle.ts Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- ts/src/base/functions/throttle.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index 439a7b2083fe5..df3d4eb7e1f86 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -69,8 +69,17 @@ class Throttler { while (this.running) { const { resolver, cost } = this.queue[0]; const nowTime = now (); - this.timestamps = this.timestamps.filter (t => nowTime - t.timestamp < this.config.windowSize); // Remove timestamps outside the rolling window - const totalCost = this.timestamps.reduce ((sum, t) => sum + t.cost, 0); // Calculate the total cost of requests still in the window + // Remove timestamps outside the rolling window + // and + // Calculate the total cost of requests still in the window + let totalCost = 0; + for (let i = this.timestamps.length - 1; i >= 0; i--) { + if (nowTime - this.timestamps[i].timestamp >= this.config.windowSize) { + this.timestamps.splice(i, 1); + } else { + totalCost += this.timestamps[i].cost; + } + } if (totalCost + cost <= this.config.maxWeight) { // Enough capacity, proceed with request this.timestamps.push ({ timestamp: nowTime, cost }); From dac6a77237f0b3db090d6ef5e9ebdf53f5036a8a Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 06:03:51 -0600 Subject: [PATCH 22/38] Update cs/ccxt/base/Throttler.cs Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 59bd1390472f5..bcef49a82e57f 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -109,7 +109,7 @@ private async Task rollingWindowLoop() var cost = first.Item2; var now = milliseconds(); var windowSize = Convert.ToDouble(this.config["windowSize"]); - timestamps = timestamps.Where(t => now - t.timestamp < windowSize).ToList(); + timestamps.RemoveAll(t => now - t.timestamp >= windowSize); var totalCost = timestamps.Sum(t => t.cost); if (totalCost + cost <= Convert.ToDouble(this.config["maxWeight"])) { From 97bbd3253a56902229e7c617dbe2d108bae6d18b Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 06:24:19 -0600 Subject: [PATCH 23/38] removed lock from leaky bucket loop --- cs/ccxt/base/Throttler.cs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index c8f4e03d9b1a7..81fa50be43865 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -38,19 +38,12 @@ private async Task leakyBucketLoop() while (this.running) { // do we need this check here? - lock (queueLock) + if (this.queue.Count == 0) { - if (this.queue.Count == 0) - { - this.running = false; - continue; - } - } - (Task, double) first; - lock (queueLock) - { - first = this.queue.Peek(); + this.running = false; + continue; } + var first = this.queue.Peek(); var task = first.Item1; var cost = first.Item2; var tokensAsString = Convert.ToString(this.config["tokens"], CultureInfo.InvariantCulture); @@ -66,14 +59,11 @@ private async Task leakyBucketLoop() task.Start(); } } - lock (queueLock) - { - this.queue.Dequeue(); + this.queue.Dequeue(); - if (this.queue.Count == 0) - { - this.running = false; - } + if (this.queue.Count == 0) + { + this.running = false; } } else From fdabf94fe0a3197c0b8e18b45f3d66d31b6273db Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 06:52:05 -0600 Subject: [PATCH 24/38] throttler config typed object --- cs/ccxt/base/Throttler.cs | 69 +++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index ffa2a6f8167e6..7b2e05c1ba6b8 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -3,33 +3,46 @@ namespace ccxt; using dict = Dictionary; -public class Throttler + +public class ThrottlerConfig { + public double RefillRate { get; set; } = 1.0; + public double Delay { get; set; } = 0.001; + public double Cost { get; set; } = 1.0; + public double Tokens { get; set; } = 0.0; + public int MaxLimiterRequests { get; set; } = 2000; + public double Capacity { get; set; } = 1.0; + public string Algorithm { get; set; } = "leakyBucket"; + public double RateLimit { get; set; } = 0.0; + public double WindowSize { get; set; } = 60000.0; + public double MaxWeight { get; set; } +} - private dict config = new dict(); +public class Throttler +{ + private ThrottlerConfig config = new ThrottlerConfig(); private Queue<(Task, double)> queue = new Queue<(Task, double)>(); private readonly object queueLock = new object(); private bool running = false; private List<(long timestamp, double cost)> timestamps = new List<(long, double)>(); - public Throttler(dict config) + public Throttler(Dictionary configInput) { - this.config = new Dictionary() + // Convert the dictionary input to our typed config + if (configInput != null) { - {"refillRate",1.0}, - {"delay", 0.001}, - {"cost", 1.0}, - {"tokens", 0.0}, - {"maxLimiterRequests", 2000}, - {"capacity", 1.0}, - {"algorithm", "leakyBucket"}, - {"rateLimit", 0.0}, - {"windowSize", 60000.0}, - // maxWeight should be set in config parameter for rolling window algorithm - }; - this.config = extend(this.config, config); - + config.RefillRate = configInput.TryGetValue("refillRate", out var refillRate) ? Convert.ToDouble(refillRate) : config.RefillRate; + config.Delay = configInput.TryGetValue("delay", out var delay) ? Convert.ToDouble(delay) : config.Delay; + config.Cost = configInput.TryGetValue("cost", out var cost) ? Convert.ToDouble(cost) : config.Cost; + config.Tokens = configInput.TryGetValue("tokens", out var tokens) ? Convert.ToDouble(tokens) : config.Tokens; + config.MaxLimiterRequests = configInput.TryGetValue("maxLimiterRequests", out var maxRequests) ? Convert.ToInt32(maxRequests) : config.MaxLimiterRequests; + config.Capacity = configInput.TryGetValue("capacity", out var capacity) ? Convert.ToDouble(capacity) : config.Capacity; + config.Algorithm = configInput.TryGetValue("algorithm", out var algorithm) ? Convert.ToString(algorithm) : config.Algorithm; + config.RateLimit = configInput.TryGetValue("rateLimit", out var rateLimit) ? Convert.ToDouble(rateLimit) : config.RateLimit; + config.WindowSize = configInput.TryGetValue("windowSize", out var windowSize) ? Convert.ToDouble(windowSize) : config.WindowSize; + config.MaxWeight = configInput.TryGetValue("maxWeight", out var maxWeight) ? Convert.ToDouble(maxWeight) : config.MaxWeight; + } } private async Task leakyBucketLoop() @@ -46,10 +59,10 @@ private async Task leakyBucketLoop() var first = this.queue.Peek(); var task = first.Item1; var cost = first.Item2; - var floatTokens = Convert.ToDouble(this.config["tokens"]); + var floatTokens = Convert.ToDouble(this.config.Tokens); if (floatTokens >= 0) { - this.config["tokens"] = floatTokens - cost; + this.config.Tokens = floatTokens - cost; await Task.Delay(0); if (task != null) { @@ -67,12 +80,12 @@ private async Task leakyBucketLoop() } else { - await Task.Delay((int)((double)this.config["delay"] * 1000)); + await Task.Delay((int)((double)this.config.Delay * 1000)); var current = milliseconds(); var elapsed = current - lastTimestamp; lastTimestamp = current; - var tokens = (double)this.config["tokens"] + ((double)this.config["refillRate"] * elapsed); - this.config["tokens"] = Math.Min(tokens, (double)this.config["capacity"]); + var tokens = (double)this.config.Tokens + ((double)this.config.RefillRate * elapsed); + this.config.Tokens = Math.Min(tokens, (double)this.config.Capacity); } } @@ -98,10 +111,10 @@ private async Task rollingWindowLoop() var task = first.Item1; var cost = first.Item2; var now = milliseconds(); - var windowSize = Convert.ToDouble(this.config["windowSize"]); + var windowSize = Convert.ToDouble(this.config.WindowSize); timestamps.RemoveAll(t => now - t.timestamp >= windowSize); var totalCost = timestamps.Sum(t => t.cost); - if (totalCost + cost <= Convert.ToDouble(this.config["maxWeight"])) + if (totalCost + cost <= Convert.ToDouble(this.config.MaxWeight)) { timestamps.Add((now, cost)); await Task.Delay(0); @@ -133,7 +146,7 @@ private async Task rollingWindowLoop() private async Task loop() { - if (this.config["algorithm"] as string == "leakyBucket") + if (this.config.Algorithm == "leakyBucket") { await leakyBucketLoop(); } @@ -145,13 +158,13 @@ private async Task loop() public async Task throttle(object cost2) { - var cost = (cost2 != null) ? Convert.ToDouble(cost2) : Convert.ToDouble(this.config["cost"]); + var cost = (cost2 != null) ? Convert.ToDouble(cost2) : Convert.ToDouble(this.config.Cost); var t = new Task(() => { }); lock (queueLock) { - if (this.queue.Count > (int)this.config["maxLimiterRequests"]) + if (this.queue.Count > (int)this.config.MaxLimiterRequests) { - throw new Exception("throttle queue is over maxLimiterRequests (" + this.config["maxLimiterRequests"].ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); + throw new Exception("throttle queue is over maxLimiterRequests (" + this.config.MaxLimiterRequests.ToString() + "), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526"); } this.queue.Enqueue((t, cost)); } From 8290df65fd9b2647b0446a992407af44066a00bb Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 17:28:11 +0400 Subject: [PATCH 25/38] rev --- cs/ccxt/base/Throttler.cs | 15 +++++++++++---- ts/src/base/functions/throttle.ts | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 7b2e05c1ba6b8..bc73b4468172c 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -62,8 +62,12 @@ private async Task leakyBucketLoop() var floatTokens = Convert.ToDouble(this.config.Tokens); if (floatTokens >= 0) { +<<<<<<< HEAD + this.config["tokens"] = floatTokens - cost; +======= this.config.Tokens = floatTokens - cost; await Task.Delay(0); +>>>>>>> fdabf94fe0a3197c0b8e18b45f3d66d31b6273db if (task != null) { if (task.Status == TaskStatus.Created) @@ -71,7 +75,14 @@ private async Task leakyBucketLoop() task.Start(); } } +<<<<<<< HEAD + await Task.Delay(0); + lock (queueLock) + { + this.queue.Dequeue(); +======= this.queue.Dequeue(); +>>>>>>> fdabf94fe0a3197c0b8e18b45f3d66d31b6273db if (this.queue.Count == 0) { @@ -130,10 +141,6 @@ private async Task rollingWindowLoop() } else { - if (timestamps.Count <= 0) - { - continue; - } var earliest = timestamps[0].timestamp; var waitTime = (earliest + windowSize) - now; if (waitTime > 0) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index df3d4eb7e1f86..a051048712f0d 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -99,7 +99,7 @@ class Throttler { } } } - } + } async loop () { if (this.config['algorithm'] === 'leakyBucket') { From d88fabf2f763a7e72b1c3d0f6d41bd2629f28392 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 19:17:41 +0400 Subject: [PATCH 26/38] min --- cs/ccxt/base/Throttler.cs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index bc73b4468172c..b3096240962c2 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -62,12 +62,7 @@ private async Task leakyBucketLoop() var floatTokens = Convert.ToDouble(this.config.Tokens); if (floatTokens >= 0) { -<<<<<<< HEAD - this.config["tokens"] = floatTokens - cost; -======= this.config.Tokens = floatTokens - cost; - await Task.Delay(0); ->>>>>>> fdabf94fe0a3197c0b8e18b45f3d66d31b6273db if (task != null) { if (task.Status == TaskStatus.Created) @@ -75,18 +70,14 @@ private async Task leakyBucketLoop() task.Start(); } } -<<<<<<< HEAD await Task.Delay(0); lock (queueLock) { this.queue.Dequeue(); -======= - this.queue.Dequeue(); ->>>>>>> fdabf94fe0a3197c0b8e18b45f3d66d31b6273db - - if (this.queue.Count == 0) - { - this.running = false; + if (this.queue.Count == 0) + { + this.running = false; + } } } else @@ -99,7 +90,6 @@ private async Task leakyBucketLoop() this.config.Tokens = Math.Min(tokens, (double)this.config.Capacity); } } - } private async Task rollingWindowLoop() From 09498f5340e7281d7dbbd01a056959425c8c4f5f Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 19:37:18 +0400 Subject: [PATCH 27/38] spacing --- ts/src/base/functions/throttle.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index a051048712f0d..1ebac26ab9ae6 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -69,17 +69,17 @@ class Throttler { while (this.running) { const { resolver, cost } = this.queue[0]; const nowTime = now (); - // Remove timestamps outside the rolling window - // and - // Calculate the total cost of requests still in the window - let totalCost = 0; - for (let i = this.timestamps.length - 1; i >= 0; i--) { - if (nowTime - this.timestamps[i].timestamp >= this.config.windowSize) { - this.timestamps.splice(i, 1); - } else { - totalCost += this.timestamps[i].cost; + // Remove timestamps outside the rolling window + // and + // Calculate the total cost of requests still in the window + let totalCost = 0; + for (let i = this.timestamps.length - 1; i >= 0; i--) { + if (nowTime - this.timestamps[i].timestamp >= this.config.windowSize) { + this.timestamps.splice(i, 1); + } else { + totalCost += this.timestamps[i].cost; + } } - } if (totalCost + cost <= this.config.maxWeight) { // Enough capacity, proceed with request this.timestamps.push ({ timestamp: nowTime, cost }); From eca339102a17de835701d3a8824ae1a9706f3802 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 20:05:25 +0400 Subject: [PATCH 28/38] ts simplify --- ts/src/base/functions/throttle.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index 1ebac26ab9ae6..0ca2958da5458 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -69,12 +69,11 @@ class Throttler { while (this.running) { const { resolver, cost } = this.queue[0]; const nowTime = now (); - // Remove timestamps outside the rolling window - // and - // Calculate the total cost of requests still in the window + const cutOffTime = nowTime - this.config.windowSize; + // Remove expired timestamps & sum remaining requests let totalCost = 0; for (let i = this.timestamps.length - 1; i >= 0; i--) { - if (nowTime - this.timestamps[i].timestamp >= this.config.windowSize) { + if (this.timestamps[i].timestamp <= cutOffTime) { this.timestamps.splice(i, 1); } else { totalCost += this.timestamps[i].cost; From c0df8920581e9c0a900ccb743741a5fc79cd2db2 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 20:05:53 +0400 Subject: [PATCH 29/38] php clauses --- php/async/Throttler.php | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/php/async/Throttler.php b/php/async/Throttler.php index a0f012fdd064c..0a115306638fa 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -69,12 +69,15 @@ public function rollingWindowLoop() { list($future, $cost) = $this->queue->bottom(); $cost = $cost ? $cost : $this->config['cost']; $now = microtime(true) * 1000.0; - $this->timestamps = array_values(array_filter($this->timestamps, function ($entry) use ($now) { - return ($now - $entry['timestamp']) < $this->config['windowSize']; - })); - $total_cost = array_reduce($this->timestamps, function ($sum, $entry) { - return $sum + $entry['cost']; - }, 0); + $total_cost = 0; + $cutoffTime = $now - $this->config['windowSize']; + for ($i = count($this->timestamps) - 1; $i >= 0; $i--) { + if ($this->timestamps[$i]['timestamp'] <= $cutoffTime) { + array_splice($this->timestamps, $i, 1); + } else { + $total_cost += $this->timestamps[$i]['cost']; + } + } if ($total_cost + $cost <= $this->config['maxWeight']) { $this->timestamps[] = array('timestamp' => $now, 'cost' => $cost); $future->resolve(null); From 361340d4dd6ed049d89d3e67be44586a3a4b43ed Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 20:06:35 +0400 Subject: [PATCH 30/38] use sleep in php --- php/async/Throttler.php | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/php/async/Throttler.php b/php/async/Throttler.php index 0a115306638fa..c710eee790f4f 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -82,8 +82,7 @@ public function rollingWindowLoop() { $this->timestamps[] = array('timestamp' => $now, 'cost' => $cost); $future->resolve(null); $this->queue->dequeue(); - # context switch? - # yield 0; + Async\await(React\Promise\Timer\sleep(0)); // context switch if ($this->queue->count() === 0) { $this->running = false; } @@ -91,12 +90,7 @@ public function rollingWindowLoop() { $earliest = $this->timestamps[0]['timestamp']; $wait_time = ($earliest + $this->config['windowSize']) - $now; if ($wait_time > 0) { - $sleep = new Promise(function ($resolve) use ($wait_time) { - Loop::addTimer($wait_time / 1000, function () use ($resolve) { - $resolve(null); - }); - }); - Async\await($sleep); + Async\await(React\Promise\Timer\sleep($wait_time / 1000)); } } } From 868dbe5add54a6d22758409267897ede07237b24 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 20:23:40 +0400 Subject: [PATCH 31/38] simplify php --- php/async/Throttler.php | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/php/async/Throttler.php b/php/async/Throttler.php index c710eee790f4f..e1511ca95cb21 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -28,7 +28,7 @@ public function __construct($config) { ), $config); $this->queue = new \SplQueue(); $this->running = false; - $this->timestamps = array(); + $this->timestamps = []; } public function leakyBucketLoop() { @@ -47,13 +47,7 @@ public function leakyBucketLoop() { $this->running = false; } } else { - $time = $this->config['delay']; - $sleep = new Promise(function ($resolve) use ($time) { - Loop::addTimer($time, function () use ($resolve) { - $resolve(null); - }); - }); - Async\await($sleep); + Async\await(React\Promise\Timer\sleep($this->config['delay'])); $now = microtime(true) * 1000; $elapsed = $now - $last_timestamp; $last_timestamp = $now; @@ -79,7 +73,7 @@ public function rollingWindowLoop() { } } if ($total_cost + $cost <= $this->config['maxWeight']) { - $this->timestamps[] = array('timestamp' => $now, 'cost' => $cost); + $this->timestamps[] = ['timestamp' => $now, 'cost' => $cost]; $future->resolve(null); $this->queue->dequeue(); Async\await(React\Promise\Timer\sleep(0)); // context switch From 19a90e0e2123f9ad32357086de1824e6ccb72b09 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Mon, 16 Jun 2025 21:06:10 +0400 Subject: [PATCH 32/38] optimizations - ts, php, py --- php/async/Throttler.php | 18 +++++++++++------- python/ccxt/async_support/base/throttler.py | 14 +++++++++++--- ts/src/base/functions/throttle.ts | 15 +++++++++------ 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/php/async/Throttler.php b/php/async/Throttler.php index e1511ca95cb21..ad083d16d3c5a 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -63,16 +63,20 @@ public function rollingWindowLoop() { list($future, $cost) = $this->queue->bottom(); $cost = $cost ? $cost : $this->config['cost']; $now = microtime(true) * 1000.0; - $total_cost = 0; $cutoffTime = $now - $this->config['windowSize']; - for ($i = count($this->timestamps) - 1; $i >= 0; $i--) { - if ($this->timestamps[$i]['timestamp'] <= $cutoffTime) { - array_splice($this->timestamps, $i, 1); - } else { - $total_cost += $this->timestamps[$i]['cost']; + $totalCost = 0; + // Remove expired timestamps & sum the remaining requests + $timestamps = []; + for ($i = 0; $i < count($this->timestamps); $i++) { + $element = $this->timestamps[$i]; + if ($element['timestamp'] > $cutoffTime) { + $totalCost += $element['cost']; + $timestamps[] = $element; } } - if ($total_cost + $cost <= $this->config['maxWeight']) { + $this->timestamps = $timestamps; + // handle current request + if ($totalCost + $cost <= $this->config['maxWeight']) { $this->timestamps[] = ['timestamp' => $now, 'cost' => $cost]; $future->resolve(null); $this->queue->dequeue(); diff --git a/python/ccxt/async_support/base/throttler.py b/python/ccxt/async_support/base/throttler.py index ff6eaf1e184de..849ceaca054e6 100644 --- a/python/ccxt/async_support/base/throttler.py +++ b/python/ccxt/async_support/base/throttler.py @@ -49,9 +49,17 @@ async def rolling_window_loop(self): future, cost = self.queue[0] cost = self.config['cost'] if cost is None else cost now = time() * 1000 - self.timestamps = [t for t in self.timestamps if now - t['timestamp'] < self.config['windowSize']] - total_cost = sum(t['cost'] for t in self.timestamps) - if total_cost + cost <= self.config['maxWeight']: + cutoffTime = now - self.config['windowSize'] + totalCost = 0 + # Remove expired timestamps & sum the remaining requests + timestamps = [] + for t in self.timestamps: + if t['timestamp'] > cutoffTime: + totalCost += t['cost'] + timestamps.append(t) + self.timestamps = timestamps + # handle current request + if totalCost + cost <= self.config['maxWeight']: self.timestamps.append({'timestamp': now, 'cost': cost}) if not future.done(): future.set_result(None) diff --git a/ts/src/base/functions/throttle.ts b/ts/src/base/functions/throttle.ts index 0ca2958da5458..698a6af66476a 100644 --- a/ts/src/base/functions/throttle.ts +++ b/ts/src/base/functions/throttle.ts @@ -70,15 +70,18 @@ class Throttler { const { resolver, cost } = this.queue[0]; const nowTime = now (); const cutOffTime = nowTime - this.config.windowSize; - // Remove expired timestamps & sum remaining requests let totalCost = 0; - for (let i = this.timestamps.length - 1; i >= 0; i--) { - if (this.timestamps[i].timestamp <= cutOffTime) { - this.timestamps.splice(i, 1); - } else { - totalCost += this.timestamps[i].cost; + // Remove expired timestamps & sum the remaining requests + const timestamps = []; + for (let i = 0; i < this.timestamps.length; i++) { + const element = this.timestamps[i]; + if (element.timestamp > cutOffTime) { + totalCost += element.cost; + timestamps.push(element); } } + this.timestamps = timestamps; + // handle current request if (totalCost + cost <= this.config.maxWeight) { // Enough capacity, proceed with request this.timestamps.push ({ timestamp: nowTime, cost }); From 1fe42b821841526dfa18e39c7385c73e5196210a Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 14:26:40 -0600 Subject: [PATCH 33/38] Update cs/ccxt/base/Throttler.cs Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index b3096240962c2..5bd13f5187d0e 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -82,7 +82,7 @@ private async Task leakyBucketLoop() } else { - await Task.Delay((int)((double)this.config.Delay * 1000)); + await Task.Delay((int)(this.config.Delay * 1000)); var current = milliseconds(); var elapsed = current - lastTimestamp; lastTimestamp = current; From c96778281522b8cf76bbcbcc3a3acf9dc4abc50c Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 14:26:47 -0600 Subject: [PATCH 34/38] Update cs/ccxt/base/Throttler.cs Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 5bd13f5187d0e..a4c9b846bcc2b 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -115,7 +115,7 @@ private async Task rollingWindowLoop() var windowSize = Convert.ToDouble(this.config.WindowSize); timestamps.RemoveAll(t => now - t.timestamp >= windowSize); var totalCost = timestamps.Sum(t => t.cost); - if (totalCost + cost <= Convert.ToDouble(this.config.MaxWeight)) + if (totalCost + cost <= this.config.MaxWeight) { timestamps.Add((now, cost)); await Task.Delay(0); From 1bee5c42c3ca637b2cd61aeefc2b561fb8f2a938 Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 14:27:16 -0600 Subject: [PATCH 35/38] Update cs/ccxt/base/Throttler.cs Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index a4c9b846bcc2b..a81f6a2008ee2 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -112,7 +112,7 @@ private async Task rollingWindowLoop() var task = first.Item1; var cost = first.Item2; var now = milliseconds(); - var windowSize = Convert.ToDouble(this.config.WindowSize); + var windowSize = this.config.WindowSize; timestamps.RemoveAll(t => now - t.timestamp >= windowSize); var totalCost = timestamps.Sum(t => t.cost); if (totalCost + cost <= this.config.MaxWeight) From a2db733d6571be91e5d1b5b9b86e385ee656a65c Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 14:28:11 -0600 Subject: [PATCH 36/38] Update cs/ccxt/base/Throttler.cs Co-authored-by: T. Todua <7117978+ttodua@users.noreply.github.com> --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index a81f6a2008ee2..8c5c931ec0ffd 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -118,11 +118,11 @@ private async Task rollingWindowLoop() if (totalCost + cost <= this.config.MaxWeight) { timestamps.Add((now, cost)); - await Task.Delay(0); if (task != null && task.Status == TaskStatus.Created) { task.Start(); } + await Task.Delay(0); lock (queueLock) { this.queue.Dequeue(); From b1e81f06020623a52202510070bc93dd273dfa55 Mon Sep 17 00:00:00 2001 From: caoilainnl Date: Mon, 16 Jun 2025 14:34:06 -0600 Subject: [PATCH 37/38] throttler updates --- cs/ccxt/base/Throttler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/ccxt/base/Throttler.cs b/cs/ccxt/base/Throttler.cs index 8c5c931ec0ffd..748781d142994 100644 --- a/cs/ccxt/base/Throttler.cs +++ b/cs/ccxt/base/Throttler.cs @@ -59,7 +59,7 @@ private async Task leakyBucketLoop() var first = this.queue.Peek(); var task = first.Item1; var cost = first.Item2; - var floatTokens = Convert.ToDouble(this.config.Tokens); + var floatTokens = this.config.Tokens; if (floatTokens >= 0) { this.config.Tokens = floatTokens - cost; From 5beff86ccac0e92e4a88533b11e44e1eedf31c08 Mon Sep 17 00:00:00 2001 From: "t.t" <7117978+ttodua@users.noreply.github.com> Date: Tue, 17 Jun 2025 07:49:10 +0400 Subject: [PATCH 38/38] sleep fix --- php/async/Throttler.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/php/async/Throttler.php b/php/async/Throttler.php index ad083d16d3c5a..73e0dcdb5ab6a 100644 --- a/php/async/Throttler.php +++ b/php/async/Throttler.php @@ -47,7 +47,7 @@ public function leakyBucketLoop() { $this->running = false; } } else { - Async\await(React\Promise\Timer\sleep($this->config['delay'])); + Async\await(\React\Promise\Timer\sleep($this->config['delay'])); $now = microtime(true) * 1000; $elapsed = $now - $last_timestamp; $last_timestamp = $now; @@ -80,7 +80,7 @@ public function rollingWindowLoop() { $this->timestamps[] = ['timestamp' => $now, 'cost' => $cost]; $future->resolve(null); $this->queue->dequeue(); - Async\await(React\Promise\Timer\sleep(0)); // context switch + Async\await(\React\Promise\Timer\sleep(0)); // context switch if ($this->queue->count() === 0) { $this->running = false; } @@ -88,7 +88,7 @@ public function rollingWindowLoop() { $earliest = $this->timestamps[0]['timestamp']; $wait_time = ($earliest + $this->config['windowSize']) - $now; if ($wait_time > 0) { - Async\await(React\Promise\Timer\sleep($wait_time / 1000)); + Async\await(\React\Promise\Timer\sleep($wait_time / 1000)); } } }