package couchbase import ( "fmt" "github.com/couchbase/goutils/logging" "sync" ) type PersistTo uint8 const ( PersistNone = PersistTo(0x00) PersistMaster = PersistTo(0x01) PersistOne = PersistTo(0x02) PersistTwo = PersistTo(0x03) PersistThree = PersistTo(0x04) PersistFour = PersistTo(0x05) ) type ObserveTo uint8 const ( ObserveNone = ObserveTo(0x00) ObserveReplicateOne = ObserveTo(0x01) ObserveReplicateTwo = ObserveTo(0x02) ObserveReplicateThree = ObserveTo(0x03) ObserveReplicateFour = ObserveTo(0x04) ) type JobType uint8 const ( OBSERVE = JobType(0x00) PERSIST = JobType(0x01) ) type ObservePersistJob struct { vb uint16 vbuuid uint64 hostname string jobType JobType failover uint8 lastPersistedSeqNo uint64 currentSeqNo uint64 resultChan chan *ObservePersistJob errorChan chan *OPErrResponse } type OPErrResponse struct { vb uint16 vbuuid uint64 err error job *ObservePersistJob } var ObservePersistPool = NewPool(1024) var OPJobChan = make(chan *ObservePersistJob, 1024) var OPJobDone = make(chan bool) var wg sync.WaitGroup func (b *Bucket) StartOPPollers(maxWorkers int) { for i := 0; i < maxWorkers; i++ { go b.OPJobPoll() wg.Add(1) } wg.Wait() } func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) { numNodes := len(b.Nodes()) if int(nPersist) > numNodes || int(nObserve) > numNodes { return fmt.Errorf("Not enough healthy nodes in the cluster") } if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas { return fmt.Errorf("Not enough replicas in the cluster") } if EnableMutationToken == false { return fmt.Errorf("Mutation Tokens not enabled ") } b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)} return } func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) { b.RLock() ds := b.ds b.RUnlock() if ds == nil { return } nj := 0 // total number of jobs resultChan := make(chan *ObservePersistJob, 10) errChan := make(chan *OPErrResponse, 10) nodes := b.GetNodeList(vb) if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) { return fmt.Errorf("Not enough healthy nodes in the cluster"), false } logging.Infof("Node list %v", nodes) if ds.Observe >= ObserveReplicateOne { // create a job for each host for i := ObserveReplicateOne; i < ds.Observe+1; i++ { opJob := ObservePersistPool.Get() opJob.vb = vb opJob.vbuuid = vbuuid opJob.jobType = OBSERVE opJob.hostname = nodes[i] opJob.resultChan = resultChan opJob.errorChan = errChan OPJobChan <- opJob nj++ } } if ds.Persist >= PersistMaster { for i := PersistMaster; i < ds.Persist+1; i++ { opJob := ObservePersistPool.Get() opJob.vb = vb opJob.vbuuid = vbuuid opJob.jobType = PERSIST opJob.hostname = nodes[i] opJob.resultChan = resultChan opJob.errorChan = errChan OPJobChan <- opJob nj++ } } ok := true for ok { select { case res := <-resultChan: jobDone := false if res.failover == 0 { // no failover if res.jobType == PERSIST { if res.lastPersistedSeqNo >= seqNo { jobDone = true } } else { if res.currentSeqNo >= seqNo { jobDone = true } } if jobDone == true { nj-- ObservePersistPool.Put(res) } else { // requeue this job OPJobChan <- res } } else { // Not currently handling failover scenarios TODO nj-- ObservePersistPool.Put(res) failover = true } if nj == 0 { // done with all the jobs ok = false close(resultChan) close(errChan) } case Err := <-errChan: logging.Errorf("Error in Observe/Persist %v", Err.err) err = fmt.Errorf("Error in Observe/Persist job %v", Err.err) nj-- ObservePersistPool.Put(Err.job) if nj == 0 { close(resultChan) close(errChan) ok = false } } } return } func (b *Bucket) OPJobPoll() { ok := true for ok == true { select { case job := <-OPJobChan: pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */) if pool == nil { errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname) errRes.job = job job.errorChan <- errRes continue } conn, err := pool.Get() if err != nil { errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} errRes.err = fmt.Errorf("Unable to get connection from pool %v", err) errRes.job = job job.errorChan <- errRes continue } res, err := conn.ObserveSeq(job.vb, job.vbuuid) if err != nil { errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid} errRes.err = fmt.Errorf("Command failed %v", err) errRes.job = job job.errorChan <- errRes continue } pool.Return(conn) job.lastPersistedSeqNo = res.LastPersistedSeqNo job.currentSeqNo = res.CurrentSeqNo job.failover = res.Failover job.resultChan <- job case <-OPJobDone: logging.Infof("Observe Persist Poller exitting") ok = false } } wg.Done() } func (b *Bucket) GetNodeList(vb uint16) []string { vbm := b.VBServerMap() if len(vbm.VBucketMap) < int(vb) { logging.Infof("vbmap smaller than vblist") return nil } nodes := make([]string, len(vbm.VBucketMap[vb])) for i := 0; i < len(vbm.VBucketMap[vb]); i++ { n := vbm.VBucketMap[vb][i] if n < 0 { continue } node := b.getMasterNode(n) if len(node) > 1 { nodes[i] = node } continue } return nodes } //pool of ObservePersist Jobs type OPpool struct { pool chan *ObservePersistJob } // NewPool creates a new pool of jobs func NewPool(max int) *OPpool { return &OPpool{ pool: make(chan *ObservePersistJob, max), } } // Borrow a Client from the pool. func (p *OPpool) Get() *ObservePersistJob { var o *ObservePersistJob select { case o = <-p.pool: default: o = &ObservePersistJob{} } return o } // Return returns a Client to the pool. func (p *OPpool) Put(o *ObservePersistJob) { select { case p.pool <- o: default: // let it go, let it go... } }