I am exploring etcd to implement a sequential number generator for distributed environment. My requirement is to generate non repeating sequential number to be used in each request for multiple instances of same application. And there can be n such application with the requirement. I did POC for this in multiple ways using STM and mutex locks provided in golang client packages
With a single node etcd server (Will be atleast 3 node cluster for RAFT to work in production) in local machine setup I wrote a simple program to generate ids(numbers) in 500 goroutines. Each routine gets 10 ids each and thus a total of 5000 ids. With time stats, STM with retry attempts performs better than mutex locks. Apart from these approaches, is there any better options to achieve sequential number generation? Can etcd be used for this purpose in the first place?
PS: I am attaching the code sample just for reference. I don't expect it to be reviewed. My concern is just the correct approach to generate sequential numbers with etcd
package main
import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
CONC "go.etcd.io/etcd/clientv3/concurrency"
"github.com/golang/glog"
ETCD "go.etcd.io/etcd/clientv3"
)
var client *ETCD.Client
var deadline = 200 * time.Second
func main() {
var err error
client, err = ETCD.New(ETCD.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
glog.Errorln("err:", err)
return
}
idGen := &SeqIDGenerator{key: "_id"}
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = client.Put(ctx, idGen.key, strconv.FormatInt(0, 10))
return err
}()
if err != nil {
glog.Errorln("err:", err)
return
}
id, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
return
}
glog.Errorln("done", id)
id, err = idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
return
}
glog.Errorln("done", id)
// st := time.Now()
// stressSTMSerialiazableSeq(idGen)
// glog.Errorln(time.Since(st))
}
type SeqIDGenerator struct {
key string
}
func (idGen *SeqIDGenerator) nextWithSTMSerialiazable() (int64, error) {
var retrived int64
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
var err error
retry := retry
for retry > 0 {
retry--
stmresp, err := CONC.NewSTMSerializable(ctx, client, func(s CONC.STM) error {
v := s.Get(idGen.key)
retrived, err = strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
retrived++
s.Put(idGen.key, strconv.FormatInt(retrived, 10))
return nil
})
if err != nil {
continue
} else if stmresp.Succeeded {
return retrived, nil
}
}
return 0, errors.New("ID gen failed. Retry exceeded")
}
func (idGen *SeqIDGenerator) nextWithMutex() (int64, error) {
s, err := CONC.NewSession(client) // explore options to pass
if err != nil {
return 0, err
}
m := CONC.NewMutex(s, idGen.key)
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
m.Lock(ctx)
defer m.Unlock(ctx)
resp, err := client.Get(ctx, idGen.key)
if err != nil {
return 0, err
}
retrived, err := strconv.ParseInt(string(resp.OpResponse().Get().Kvs[0].Value), 10, 64)
if err != nil {
return 0, err
}
retrived++
_, err = client.Put(ctx, idGen.key, strconv.FormatInt(retrived, 10))
if err != nil {
return 0, err
}
return retrived, nil
}
func (idGen *SeqIDGenerator) nextWithSTMReapeatable() (int64, error) {
var retrived int64
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
var err error
retry := retry
for retry > 0 {
retry--
stmresp, err := CONC.NewSTMRepeatable(ctx, client, func(s CONC.STM) error {
v := s.Get(idGen.key)
retrived, err = strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
retrived++
s.Put(idGen.key, strconv.FormatInt(retrived, 10))
return nil
})
if err != nil {
continue
} else if stmresp.Succeeded {
return retrived, nil
}
}
return 0, errors.New("ID gen failed. Retry exceeded")
}
var n int = 500
var retry int = 40 // move as conf
func stressMutex(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
return
}
}
}(i)
}
wg.Wait()
}
func stressMutexSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMSerialiazableSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMReapeatableSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMReapeatable()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMSerialiazable(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
var success int64
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
} else {
atomic.AddInt64(&success, 1)
}
}
}(i)
}
wg.Wait()
glog.Errorln("success:", success)
}
func stressSTMReapeatable(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
var success int64
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMReapeatable()
if err != nil {
glog.Errorln("err:", err)
} else {
atomic.AddInt64(&success, 1)
}
}
}(i)
}
wg.Wait()
glog.Errorln("success:", success)
}
Been looking at this myself. Here are two possible ways to do this: (I'm newer at etcd, so caveat emptor)
Use the value returned for "revision" in your client. This should effectively be an atomic operation that generates a unique value (globally) for the client invoking 'put'.
Try this:
Again, the value returned for "mod_revision" in your client. This depends upon 'prev-kv' being atomic WRT 'put', such that any 'put'+'prev-key' forms an atomic operation. I believe this to be so, but don't have a citation.
How are my rules on how to decide between the two:
If I can accept unique values with gaps, use the global "revision", it can't be reset to zero unless you start with a new KV store. This is my prefered method, as this can't be accidentally reset.
If you must have unique sequential values, use the per-key "mod_revision". But be aware if you somehow accidentally delete the k/v for tracking mod_revision, the serial number will start at zero! This could cause big problems for you. But the advantage here is you can have multiple sequential unique serial numbers.