diff --git a/go.mod b/go.mod index 89f92e574e..21778e9b67 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,8 @@ require ( github.com/blevesearch/go-porterstemmer v1.0.2 // indirect github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/boombuler/barcode v0.0.0-20161226211916-fe0f26ff6d26 // indirect + github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 // indirect + github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 // indirect github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd // indirect github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect diff --git a/go.sum b/go.sum index 5fb1dff7c4..5563e2d619 100644 --- a/go.sum +++ b/go.sum @@ -97,8 +97,12 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d h1:XMf4E1U+b9E3ElF0mjvfXZdflBRZz4gLp16nQ/QSHQM= github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c= +github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 h1:vZryARwW4PSFXd9arwegEywvMTvPuXL3/oa+4L5NTe8= +github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c= github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b h1:bZ9rKU2/V8sY+NulSfxDOnXTWcs1rySqdF1sVepihvo= github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs= +github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 h1:0WMIDtuXCKEm4wtAJgAAXa/qtM5O9MariLwgHaRlYmk= +github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs= github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd h1:zeuJhcG3f8eePshH3KxkNE+Xtl53pVln9MOUPMyr/1w= github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd/go.mod h1:xbc8Ff/oG7h2ejd7AlwOpfd+6QZntc92ygpAOfGwcKY= github.com/couchbaselabs/go-couchbase v0.0.0-20190708161019-23e7ca2ce2b7 h1:1XjEY/gnjQ+AfXef2U6dxCquhiRzkEpxZuWqs+QxTL8= diff --git a/vendor/github.com/couchbase/gomemcached/client/collections_filter.go b/vendor/github.com/couchbase/gomemcached/client/collections_filter.go new file mode 100644 index 0000000000..0bedae1c35 --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/collections_filter.go @@ -0,0 +1,123 @@ +package memcached + +import ( + "encoding/json" + "fmt" +) + +// Collection based filter +type CollectionsFilter struct { + ManifestUid uint64 + UseManifestUid bool + StreamId uint16 + UseStreamId bool + + // Use either ScopeId OR CollectionsList, not both + CollectionsList []uint32 + ScopeId uint32 +} + +type nonStreamIdNonResumeScopeMeta struct { + ScopeId string `json:"scope"` +} + +type nonStreamIdResumeScopeMeta struct { + ManifestId string `json:"uid"` +} + +type nonStreamIdNonResumeCollectionsMeta struct { + CollectionsList []string `json:"collections"` +} + +type nonStreamIdResumeCollectionsMeta struct { + ManifestId string `json:"uid"` + CollectionsList []string `json:"collections"` +} + +type streamIdNonResumeCollectionsMeta struct { + CollectionsList []string `json:"collections"` + StreamId uint16 `json:"sid"` +} + +type streamIdNonResumeScopeMeta struct { + ScopeId string `json:"scope"` + StreamId uint16 `json:"sid"` +} + +func (c *CollectionsFilter) IsValid() error { + if c.UseManifestUid { + return fmt.Errorf("Not implemented yet") + } + + if len(c.CollectionsList) > 0 && c.ScopeId > 0 { + return fmt.Errorf("Collection list is specified but scope ID is also specified") + } + + return nil +} + +func (c *CollectionsFilter) outputCollectionsFilterColList() (outputList []string) { + for _, collectionUint := range c.CollectionsList { + outputList = append(outputList, fmt.Sprintf("%x", collectionUint)) + } + return +} + +func (c *CollectionsFilter) outputScopeId() string { + return fmt.Sprintf("%x", c.ScopeId) +} + +func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error) { + if err := c.IsValid(); err != nil { + return nil, err + } + + var output interface{} + + switch c.UseStreamId { + case true: + switch c.UseManifestUid { + case true: + // TODO + return nil, fmt.Errorf("NotImplemented0") + case false: + switch len(c.CollectionsList) > 0 { + case true: + filter := &streamIdNonResumeCollectionsMeta{ + StreamId: c.StreamId, + CollectionsList: c.outputCollectionsFilterColList(), + } + output = *filter + case false: + filter := &streamIdNonResumeScopeMeta{ + StreamId: c.StreamId, + ScopeId: c.outputScopeId(), + } + output = *filter + } + } + case false: + switch c.UseManifestUid { + case true: + // TODO + return nil, fmt.Errorf("NotImplemented1") + case false: + switch len(c.CollectionsList) > 0 { + case true: + filter := &nonStreamIdNonResumeCollectionsMeta{ + CollectionsList: c.outputCollectionsFilterColList(), + } + output = *filter + case false: + output = nonStreamIdNonResumeScopeMeta{ScopeId: c.outputScopeId()} + } + } + } + + data, err := json.Marshal(output) + if err != nil { + return nil, err + } else { + return data, nil + } +} diff --git a/vendor/github.com/couchbase/gomemcached/client/mc.go b/vendor/github.com/couchbase/gomemcached/client/mc.go index 0f1d61e512..66c897c5d6 100644 --- a/vendor/github.com/couchbase/gomemcached/client/mc.go +++ b/vendor/github.com/couchbase/gomemcached/client/mc.go @@ -28,10 +28,12 @@ type ClientIface interface { CASNext(vb uint16, k string, exp int, state *CASState) bool CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) + CollectionEnabled() bool Close() error Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) Del(vb uint16, key string) (*gomemcached.MCResponse, error) EnableMutationToken() (*gomemcached.MCResponse, error) + EnableFeatures(features Features) (*gomemcached.MCResponse, error) Get(vb uint16, key string) (*gomemcached.MCResponse, error) GetCollectionsManifest() (*gomemcached.MCResponse, error) GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error) @@ -76,9 +78,12 @@ var Healthy uint32 = 1 type Features []Feature type Feature uint16 -const FeatureMutationToken = Feature(0x04) +const FeatureTcpNoDelay = Feature(0x03) +const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations const FeatureXattr = Feature(0x06) +const FeatureXerror = Feature(0x07) const FeatureCollections = Feature(0x12) +const FeatureSnappyCompression = Feature(0x0a) const FeatureDataType = Feature(0x0b) type memcachedConnection interface { @@ -96,6 +101,9 @@ type Client struct { opaque uint32 hdrBuf []byte + + featureMtx sync.RWMutex + sentHeloFeatures Features } var ( @@ -285,6 +293,10 @@ func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, err binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature)) } + c.featureMtx.Lock() + c.sentHeloFeatures = features + c.featureMtx.Unlock() + return c.Send(&gomemcached.MCRequest{ Opcode: gomemcached.HELLO, Key: []byte("GoMemcached"), @@ -363,6 +375,18 @@ func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcache return res, nil } +func (c *Client) CollectionEnabled() bool { + c.featureMtx.RLock() + defer c.featureMtx.RUnlock() + + for _, feature := range c.sentHeloFeatures { + if feature == FeatureCollections { + return true + } + } + return false +} + // Get the value for a key, and update expiry func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) { extraBuf := make([]byte, 4) @@ -1138,3 +1162,7 @@ func IfResStatusError(response *gomemcached.MCResponse) bool { response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND && response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED) } + +func (c *Client) Conn() io.ReadWriteCloser { + return c.conn +} diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_event.go b/vendor/github.com/couchbase/gomemcached/client/upr_event.go new file mode 100644 index 0000000000..31e0abfbfd --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/client/upr_event.go @@ -0,0 +1,346 @@ +package memcached + +import ( + "encoding/binary" + "fmt" + "github.com/couchbase/gomemcached" + "math" +) + +type SystemEventType int + +const InvalidSysEvent SystemEventType = -1 + +const ( + CollectionCreate SystemEventType = 0 + CollectionDrop SystemEventType = iota + CollectionFlush SystemEventType = iota // KV did not implement + ScopeCreate SystemEventType = iota + ScopeDrop SystemEventType = iota + CollectionChanged SystemEventType = iota +) + +type ScopeCreateEvent interface { + GetSystemEventName() (string, error) + GetScopeId() (uint32, error) + GetManifestId() (uint64, error) +} + +type CollectionCreateEvent interface { + GetSystemEventName() (string, error) + GetScopeId() (uint32, error) + GetCollectionId() (uint32, error) + GetManifestId() (uint64, error) + GetMaxTTL() (uint32, error) +} + +type CollectionDropEvent interface { + GetScopeId() (uint32, error) + GetCollectionId() (uint32, error) + GetManifestId() (uint64, error) +} + +type ScopeDropEvent interface { + GetScopeId() (uint32, error) + GetManifestId() (uint64, error) +} + +type CollectionChangedEvent interface { + GetCollectionId() (uint32, error) + GetManifestId() (uint64, error) + GetMaxTTL() (uint32, error) +} + +var ErrorInvalidOp error = fmt.Errorf("Invalid Operation") +var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing") +var ErrorValueTooShort error = fmt.Errorf("Value length is too short") +var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL") + +// UprEvent memcached events for UPR streams. +type UprEvent struct { + Opcode gomemcached.CommandCode // Type of event + Status gomemcached.Status // Response status + VBucket uint16 // VBucket this event applies to + DataType uint8 // data type + Opaque uint16 // 16 MSB of opaque + VBuuid uint64 // This field is set by downstream + Flags uint32 // Item flags + Expiry uint32 // Item expiration time + Key, Value []byte // Item key/value + OldValue []byte // TODO: TBD: old document value + Cas uint64 // CAS value of the item + Seqno uint64 // sequence number of the mutation + RevSeqno uint64 // rev sequence number : deletions + LockTime uint32 // Lock time + MetadataSize uint16 // Metadata size + SnapstartSeq uint64 // start sequence number of this snapshot + SnapendSeq uint64 // End sequence number of the snapshot + SnapshotType uint32 // 0: disk 1: memory + FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number + Error error // Error value in case of a failure + ExtMeta []byte // Extended Metadata + AckSize uint32 // The number of bytes that can be Acked to DCP + SystemEvent SystemEventType // Only valid if IsSystemEvent() is true + SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different + ValueLen int // Cache it to avoid len() calls for performance + CollectionId uint64 // Valid if Collection is in use +} + +// FailoverLog containing vvuid and sequnce number +type FailoverLog [][2]uint64 + +func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent { + event := &UprEvent{ + Opcode: rq.Opcode, + VBucket: stream.Vbucket, + VBuuid: stream.Vbuuid, + Value: rq.Body, + Cas: rq.Cas, + ExtMeta: rq.ExtMeta, + DataType: rq.DataType, + ValueLen: len(rq.Body), + SystemEvent: InvalidSysEvent, + CollectionId: math.MaxUint64, + } + + event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType) + + // set AckSize for events that need to be acked to DCP, + // i.e., events with CommandCodes that need to be buffered in DCP + if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok { + event.AckSize = uint32(bytesReceivedFromDCP) + } + + // 16 LSBits are used by client library to encode vbucket number. + // 16 MSBits are left for application to multiplex on opaque value. + event.Opaque = appOpaque(rq.Opaque) + + if len(rq.Extras) >= uprMutationExtraLen && + event.Opcode == gomemcached.UPR_MUTATION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20]) + event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24]) + event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28]) + event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30]) + + } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen && + event.Opcode == gomemcached.UPR_DELETION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20]) + + } else if len(rq.Extras) >= uprDeletetionExtraLen && + event.Opcode == gomemcached.UPR_DELETION || + event.Opcode == gomemcached.UPR_EXPIRATION { + + event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) + event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18]) + + } else if len(rq.Extras) >= uprSnapshotExtraLen && + event.Opcode == gomemcached.UPR_SNAPSHOT { + + event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8]) + event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16]) + event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20]) + } else if event.IsSystemEvent() { + event.PopulateEvent(rq.Extras) + } + + return event +} + +func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) { + switch streamType { + case CollectionsNonStreamId: + switch rq.Opcode { + // Only these will have CID encoded within the key + case gomemcached.UPR_MUTATION, + gomemcached.UPR_DELETION, + gomemcached.UPR_EXPIRATION: + uleb128 := Uleb128(rq.Key) + result, bytesShifted := uleb128.ToUint64(rq.Keylen) + event.CollectionId = result + event.Key = rq.Key[bytesShifted:] + default: + event.Key = rq.Key + } + case CollectionsStreamId: + // TODO - not implemented + fallthrough + case NonCollectionStream: + // Let default behavior be legacy stream type + fallthrough + default: + event.Key = rq.Key + } +} + +func (event *UprEvent) String() string { + name := gomemcached.CommandNames[event.Opcode] + if name == "" { + name = fmt.Sprintf("#%d", event.Opcode) + } + return name +} + +func (event *UprEvent) IsSnappyDataType() bool { + return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0) +} + +func (event *UprEvent) IsCollectionType() bool { + return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32 +} + +func (event *UprEvent) IsSystemEvent() bool { + return event.Opcode == gomemcached.DCP_SYSTEM_EVENT +} + +func (event *UprEvent) PopulateEvent(extras []byte) { + if len(extras) < dcpSystemEventExtraLen { + // Wrong length, don't parse + return + } + event.Seqno = binary.BigEndian.Uint64(extras[:8]) + event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12])) + var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14]) + event.SysEventVersion = uint8(versionTemp >> 8) +} + +func (event *UprEvent) GetSystemEventName() (string, error) { + switch event.SystemEvent { + case CollectionCreate: + fallthrough + case ScopeCreate: + return string(event.Key), nil + default: + return "", ErrorInvalidOp + } +} + +func (event *UprEvent) GetManifestId() (uint64, error) { + switch event.SystemEvent { + // Version 0 only checks + case CollectionChanged: + fallthrough + case ScopeDrop: + fallthrough + case ScopeCreate: + fallthrough + case CollectionDrop: + if event.SysEventVersion > 0 { + return 0, ErrorInvalidVersion + } + fallthrough + case CollectionCreate: + // CollectionCreate supports version 1 + if event.SysEventVersion > 1 { + return 0, ErrorInvalidVersion + } + if event.ValueLen < 8 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint64(event.Value[0:8]), nil + default: + return 0, ErrorInvalidOp + } +} + +func (event *UprEvent) GetCollectionId() (uint32, error) { + switch event.SystemEvent { + case CollectionDrop: + if event.SysEventVersion > 0 { + return 0, ErrorInvalidVersion + } + fallthrough + case CollectionCreate: + if event.SysEventVersion > 1 { + return 0, ErrorInvalidVersion + } + if event.ValueLen < 16 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint32(event.Value[12:16]), nil + case CollectionChanged: + if event.SysEventVersion > 0 { + return 0, ErrorInvalidVersion + } + if event.ValueLen < 12 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint32(event.Value[8:12]), nil + default: + return 0, ErrorInvalidOp + } +} + +func (event *UprEvent) GetScopeId() (uint32, error) { + switch event.SystemEvent { + // version 0 checks + case ScopeCreate: + fallthrough + case ScopeDrop: + fallthrough + case CollectionDrop: + if event.SysEventVersion > 0 { + return 0, ErrorInvalidVersion + } + fallthrough + case CollectionCreate: + // CollectionCreate could be either 0 or 1 + if event.SysEventVersion > 1 { + return 0, ErrorInvalidVersion + } + if event.ValueLen < 12 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint32(event.Value[8:12]), nil + default: + return 0, ErrorInvalidOp + } +} + +func (event *UprEvent) GetMaxTTL() (uint32, error) { + switch event.SystemEvent { + case CollectionCreate: + if event.SysEventVersion < 1 { + return 0, ErrorNoMaxTTL + } + if event.ValueLen < 20 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint32(event.Value[16:20]), nil + case CollectionChanged: + if event.SysEventVersion > 0 { + return 0, ErrorInvalidVersion + } + if event.ValueLen < 16 { + return 0, ErrorValueTooShort + } + return binary.BigEndian.Uint32(event.Value[12:16]), nil + default: + return 0, ErrorInvalidOp + } +} + +type Uleb128 []byte + +func (u Uleb128) ToUint64(cachedLen int) (result uint64, bytesShifted int) { + var shift uint = 0 + + for curByte := 0; curByte < cachedLen; curByte++ { + oneByte := u[curByte] + last7Bits := 0x7f & oneByte + result |= uint64(last7Bits) << shift + bytesShifted++ + if oneByte&0x80 == 0 { + break + } + shift += 7 + } + + return +} diff --git a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go index 95fa12577f..085b03c145 100644 --- a/vendor/github.com/couchbase/gomemcached/client/upr_feed.go +++ b/vendor/github.com/couchbase/gomemcached/client/upr_feed.go @@ -19,6 +19,7 @@ const uprMutationExtraLen = 30 const uprDeletetionExtraLen = 18 const uprDeletetionWithDeletionTimeExtraLen = 21 const uprSnapshotExtraLen = 20 +const dcpSystemEventExtraLen = 13 const bufferAckThreshold = 0.2 const opaqueOpen = 0xBEAF0001 const opaqueFailover = 0xDEADBEEF @@ -27,32 +28,6 @@ const uprDefaultNoopInterval = 120 // Counter on top of opaqueOpen that others can draw from for open and control msgs var opaqueOpenCtrlWell uint32 = opaqueOpen -// UprEvent memcached events for UPR streams. -type UprEvent struct { - Opcode gomemcached.CommandCode // Type of event - Status gomemcached.Status // Response status - VBucket uint16 // VBucket this event applies to - DataType uint8 // data type - Opaque uint16 // 16 MSB of opaque - VBuuid uint64 // This field is set by downstream - Flags uint32 // Item flags - Expiry uint32 // Item expiration time - Key, Value []byte // Item key/value - OldValue []byte // TODO: TBD: old document value - Cas uint64 // CAS value of the item - Seqno uint64 // sequence number of the mutation - RevSeqno uint64 // rev sequence number : deletions - LockTime uint32 // Lock time - MetadataSize uint16 // Metadata size - SnapstartSeq uint64 // start sequence number of this snapshot - SnapendSeq uint64 // End sequence number of the snapshot - SnapshotType uint32 // 0: disk 1: memory - FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number - Error error // Error value in case of a failure - ExtMeta []byte - AckSize uint32 // The number of bytes that can be Acked to DCP -} - type PriorityType string // high > medium > disabled > low @@ -63,13 +38,39 @@ const ( PriorityHigh PriorityType = "high" ) +type DcpStreamType int32 + +var UninitializedStream DcpStreamType = -1 + +const ( + NonCollectionStream DcpStreamType = 0 + CollectionsNonStreamId DcpStreamType = iota + CollectionsStreamId DcpStreamType = iota +) + +func (t DcpStreamType) String() string { + switch t { + case UninitializedStream: + return "Un-Initialized Stream" + case NonCollectionStream: + return "Traditional Non-Collection Stream" + case CollectionsNonStreamId: + return "Collections Stream without StreamID" + case CollectionsStreamId: + return "Collection Stream with StreamID" + default: + return "Unknown Stream Type" + } +} + // UprStream is per stream data structure over an UPR Connection. type UprStream struct { - Vbucket uint16 // Vbucket id - Vbuuid uint64 // vbucket uuid - StartSeq uint64 // start sequence number - EndSeq uint64 // end sequence number - connected bool + Vbucket uint16 // Vbucket id + Vbuuid uint64 // vbucket uuid + StartSeq uint64 // start sequence number + EndSeq uint64 // end sequence number + connected bool + StreamType DcpStreamType } type FeedState int @@ -113,6 +114,7 @@ type UprFeatures struct { IncludeDeletionTime bool DcpPriority PriorityType EnableExpiry bool + EnableStreamId bool } /** @@ -274,9 +276,15 @@ type UprFeed struct { // if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP // if flag is false, upr feed will track how many bytes it has sent to client // and use that to determine whether/when to send ack to DCP - ackByClient bool - feedState FeedState - muFeedState sync.RWMutex + ackByClient bool + feedState FeedState + muFeedState sync.RWMutex + activatedFeatures UprFeatures + collectionEnabled bool // This is needed separately because parsing depends on this + // DCP StreamID allows multiple filtered collection streams to share a single DCP Stream + // It is not allowed once a regular/legacy stream was started originally + streamsType DcpStreamType + initStreamTypeOnce sync.Once } // Exported interface - to allow for mocking @@ -296,6 +304,9 @@ type UprFeedIface interface { UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response SetPriorityAsync(p PriorityType) error + + // Various Collection-Type RequestStreams + UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error } type UprStats struct { @@ -305,9 +316,6 @@ type UprStats struct { TotalSnapShot uint64 } -// FailoverLog containing vvuid and sequnce number -type FailoverLog [][2]uint64 - // error codes var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog") @@ -320,76 +328,6 @@ func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) { return vbuuid, seqno, ErrorInvalidLog } -func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent { - event := &UprEvent{ - Opcode: rq.Opcode, - VBucket: stream.Vbucket, - VBuuid: stream.Vbuuid, - Key: rq.Key, - Value: rq.Body, - Cas: rq.Cas, - ExtMeta: rq.ExtMeta, - DataType: rq.DataType, - } - - // set AckSize for events that need to be acked to DCP, - // i.e., events with CommandCodes that need to be buffered in DCP - if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok { - event.AckSize = uint32(bytesReceivedFromDCP) - } - - // 16 LSBits are used by client library to encode vbucket number. - // 16 MSBits are left for application to multiplex on opaque value. - event.Opaque = appOpaque(rq.Opaque) - - if len(rq.Extras) >= uprMutationExtraLen && - event.Opcode == gomemcached.UPR_MUTATION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20]) - event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24]) - event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28]) - event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30]) - - } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen && - event.Opcode == gomemcached.UPR_DELETION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20]) - - } else if len(rq.Extras) >= uprDeletetionExtraLen && - event.Opcode == gomemcached.UPR_DELETION || - event.Opcode == gomemcached.UPR_EXPIRATION { - - event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) - event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18]) - - } else if len(rq.Extras) >= uprSnapshotExtraLen && - event.Opcode == gomemcached.UPR_SNAPSHOT { - - event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8]) - event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16]) - event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20]) - } - - return event -} - -func (event *UprEvent) String() string { - name := gomemcached.CommandNames[event.Opcode] - if name == "" { - name = fmt.Sprintf("#%d", event.Opcode) - } - return name -} - -func (event *UprEvent) IsSnappyDataType() bool { - return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0) -} - func (feed *UprFeed) sendCommands(mc *Client) { transmitCh := feed.transmitCh transmitCl := feed.transmitCl @@ -420,6 +358,10 @@ func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) erro feed.muVbstreams.Lock() defer feed.muVbstreams.Unlock() + if feed.collectionEnabled { + stream.StreamType = feed.streamsType + } + // Set this stream as the officially connected stream for this vb stream.connected = true feed.vbstreams[vbno] = stream @@ -440,14 +382,15 @@ func (mc *Client) NewUprFeed() (*UprFeed, error) { } func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) { - feed := &UprFeed{ - conn: mc, - closer: make(chan bool, 1), - vbstreams: make(map[uint16]*UprStream), - transmitCh: make(chan *gomemcached.MCRequest), - transmitCl: make(chan bool), - ackByClient: ackByClient, + conn: mc, + closer: make(chan bool, 1), + vbstreams: make(map[uint16]*UprStream), + transmitCh: make(chan *gomemcached.MCRequest), + transmitCl: make(chan bool), + ackByClient: ackByClient, + collectionEnabled: mc.CollectionEnabled(), + streamsType: UninitializedStream, } feed.negotiator.initialize() @@ -642,7 +585,22 @@ func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, featu activatedFeatures.EnableExpiry = true } + if features.EnableStreamId { + rq := &gomemcached.MCRequest{ + Opcode: gomemcached.UPR_CONTROL, + Key: []byte("enable_stream_id"), + Body: []byte("true"), + Opaque: getUprOpenCtrlOpaque(), + } + err = sendMcRequestSync(feed.conn, rq) + if err != nil { + return + } + activatedFeatures.EnableStreamId = true + } + // everything is ok so far, set upr feed to open state + feed.activatedFeatures = activatedFeatures feed.setOpen() return } @@ -689,10 +647,60 @@ func (mc *Client) UprGetFailoverLog( func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { + return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil) +} + +func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) { + if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId { + err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature") + return + } + + streamInitFunc := func() { + if feed.streamsType != UninitializedStream { + // Shouldn't happen + err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String()) + } else { + if !feed.collectionEnabled { + feed.streamsType = NonCollectionStream + } else { + if filter != nil && filter.UseStreamId { + feed.streamsType = CollectionsStreamId + } else { + feed.streamsType = CollectionsNonStreamId + } + } + } + } + feed.initStreamTypeOnce.Do(streamInitFunc) + return +} + +func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, + vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error { + + err := feed.initStreamType(filter) + if err != nil { + return err + } + + var mcRequestBody []byte + if filter != nil { + err = filter.IsValid() + if err != nil { + return err + } + mcRequestBody, err = filter.ToStreamReqBody() + if err != nil { + return err + } + } + rq := &gomemcached.MCRequest{ Opcode: gomemcached.UPR_STREAMREQ, VBucket: vbno, Opaque: composeOpaque(vbno, opaqueMSB), + Body: mcRequestBody, } rq.Extras = make([]byte, 48) // #Extras @@ -700,15 +708,15 @@ func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0)) binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence) binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence) - binary.BigEndian.PutUint64(rq.Extras[24:32], vuuid) + binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid) binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart) binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd) - feed.negotiator.registerRequest(vbno, opaqueMSB, vuuid, startSequence, endSequence) + feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence) // Any client that has ever called this method, regardless of return code, // should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit. - if err := feed.conn.Transmit(rq); err != nil { + if err = feed.conn.Transmit(rq); err != nil { logging.Errorf("Error in StreamRequest %s", err.Error()) // If an error occurs during transmit, then the UPRFeed will keep the stream // in the vbstreams map. This is to prevent nil lookup from any previously @@ -973,6 +981,12 @@ loop: if err := feed.conn.TransmitResponse(noop); err != nil { logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error()) } + case gomemcached.DCP_SYSTEM_EVENT: + if stream == nil { + logging.Infof("Stream not found for vb %d: %#v", vb, pkt) + break loop + } + event = makeUprEvent(pkt, stream, bytes) default: logging.Infof("Recived an unknown response for vbucket %d", vb) } diff --git a/vendor/github.com/couchbase/gomemcached/flexibleFraming.go b/vendor/github.com/couchbase/gomemcached/flexibleFraming.go new file mode 100644 index 0000000000..6f7540312e --- /dev/null +++ b/vendor/github.com/couchbase/gomemcached/flexibleFraming.go @@ -0,0 +1,381 @@ +package gomemcached + +import ( + "encoding/binary" + "fmt" +) + +type FrameObjType int + +const ( + FrameBarrier FrameObjType = iota + FrameDurability FrameObjType = iota + FrameDcpStreamId FrameObjType = iota + FrameOpenTracing FrameObjType = iota +) + +type FrameInfo struct { + ObjId FrameObjType + ObjLen int + ObjData []byte +} + +var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable") +var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data") + +func (f *FrameInfo) Validate() error { + switch f.ObjId { + case FrameBarrier: + if f.ObjLen != 0 { + return fmt.Errorf("Invalid FrameBarrier - length is %v\n", f.ObjLen) + } else if f.ObjLen != len(f.ObjData) { + return ErrorObjLenNotMatch + } + case FrameDurability: + if f.ObjLen != 1 && f.ObjLen != 3 { + return fmt.Errorf("Invalid FrameDurability - length is %v\n", f.ObjLen) + } else if f.ObjLen != len(f.ObjData) { + return ErrorObjLenNotMatch + } + case FrameDcpStreamId: + if f.ObjLen != 2 { + return fmt.Errorf("Invalid FrameDcpStreamId - length is %v\n", f.ObjLen) + } else if f.ObjLen != len(f.ObjData) { + return ErrorObjLenNotMatch + } + case FrameOpenTracing: + if f.ObjLen == 0 { + return fmt.Errorf("Invalid FrameOpenTracing - length must be > 0") + } else if f.ObjLen != len(f.ObjData) { + return ErrorObjLenNotMatch + } + default: + return fmt.Errorf("Unknown FrameInfo type") + } + return nil +} + +func (f *FrameInfo) GetStreamId() (uint16, error) { + if f.ObjId != FrameDcpStreamId { + return 0, ErrorInvalidOp + } + + var output uint16 + output = uint16(f.ObjData[0]) + output = output << 8 + output |= uint16(f.ObjData[1]) + return output, nil +} + +type DurabilityLvl uint8 + +const ( + DuraInvalid DurabilityLvl = iota // Not used (0x0) + DuraMajority DurabilityLvl = iota // (0x01) + DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02) + DuraPersistToMajority DurabilityLvl = iota // (0x03) +) + +func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error) { + if f.ObjId != FrameDurability { + err = ErrorInvalidOp + return + } + if f.ObjLen != 1 && f.ObjLen != 3 { + err = ErrorObjLenNotMatch + return + } + + lvl = DurabilityLvl(uint8(f.ObjData[0])) + + if f.ObjLen == 3 { + timeoutProvided = true + timeoutMs = binary.BigEndian.Uint16(f.ObjData[1:2]) + } + + return +} + +func incrementMarker(bitsToBeIncremented, byteIncrementCnt *int, framingElen, curObjIdx int) (int, error) { + for *bitsToBeIncremented >= 8 { + *byteIncrementCnt++ + *bitsToBeIncremented -= 8 + } + marker := curObjIdx + *byteIncrementCnt + if marker > framingElen { + return -1, fmt.Errorf("Out of bounds") + } + return marker, nil +} + +// Right now, halfByteRemaining will always be false, because ObjID and Len haven't gotten that large yet +func (f *FrameInfo) Bytes() (output []byte, halfByteRemaining bool) { + // ObjIdentifier - 4 bits + ObjLength - 4 bits + var idAndLen uint8 + idAndLen |= uint8(f.ObjId) << 4 + idAndLen |= uint8(f.ObjLen) + output = append(output, byte(idAndLen)) + + // Rest is Data + output = append(output, f.ObjData...) + return +} + +func parseFrameInfoObjects(buf []byte, framingElen int) (objs []FrameInfo, err error, halfByteRemaining bool) { + var curObjIdx int + var byteIncrementCnt int + var bitsToBeIncremented int + var marker int + + // Parse frameInfo objects + for curObjIdx = 0; curObjIdx < framingElen; curObjIdx += byteIncrementCnt { + byteIncrementCnt = 0 + var oneFrameObj FrameInfo + + // First get the objId + // ------------------------- + var objId int + var objHeader uint8 = buf[curObjIdx] + var objIdentifierRaw uint8 + if bitsToBeIncremented == 0 { + // ObjHeader + // 0 1 2 3 4 5 6 7 + // ^-----^ + // ObjIdentifierRaw + objIdentifierRaw = (objHeader & 0xf0) >> 4 + } else { + // ObjHeader + // 0 1 2 3 4 5 6 7 + // ^-----^ + // ObjIdentifierRaw + objIdentifierRaw = (objHeader & 0x0f) + } + bitsToBeIncremented += 4 + + marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) + if err != nil { + return + } + + // Value is 0-14 + objId = int(objIdentifierRaw & 0xe) + // If bit 15 is set, ID is 15 + value of next byte + if objIdentifierRaw&0x1 > 0 { + if bitsToBeIncremented > 0 { + // ObjHeader + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // ^-----^ ^---------------^ + // ObjId1 Extension + // ^ marker + buffer := uint16(buf[marker]) + buffer = buffer << 8 + buffer |= uint16(buf[marker+1]) + var extension uint8 = uint8(buffer & 0xff0 >> 4) + objId += int(extension) + } else { + // ObjHeader + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // ^-----^ ^-------------------^ + // ObjId1 extension + // ^ marker + var extension uint8 = uint8(buf[marker]) + objId += int(extension) + } + bitsToBeIncremented += 8 + } + + marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) + if err != nil { + return + } + oneFrameObj.ObjId = FrameObjType(objId) + + // Then get the obj length + // ------------------------- + var objLenRaw uint8 + var objLen int + if bitsToBeIncremented > 0 { + // ObjHeader + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // ^ ^---------^ + // marker objLen + objLenRaw = uint8(buf[marker]) & 0x0f + } else { + // ObjHeader + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 + // ^--------^ + // objLen + // ^ marker + objLenRaw = uint8(buf[marker]) & 0xf0 >> 4 + } + bitsToBeIncremented += 4 + + marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) + if err != nil { + return + } + + // Length is 0-14 + objLen = int(objLenRaw & 0xe) + // If bit 15 is set, lenghth is 15 + value of next byte + if objLenRaw&0x1 > 0 { + if bitsToBeIncremented == 0 { + // ObjHeader + // 12 13 14 15 16 17 18 19 20 21 22 23 + // ^---------^ ^--------------------^ + // objLen extension + // ^ marker + var extension uint8 = uint8(buf[marker]) + objLen += int(extension) + } else { + // ObjHeader + // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 + // ^--------^ ^---------------------^ + // objLen extension + // ^ marker var buffer uint16 + buffer := uint16(buf[marker]) + buffer = buffer << 8 + buffer |= uint16(buf[marker+1]) + var extension uint8 = uint8(buffer & 0xff0 >> 4) + objLen += int(extension) + } + bitsToBeIncremented += 8 + } + + marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) + if err != nil { + return + } + oneFrameObj.ObjLen = objLen + + // The rest is N-bytes of data based on the length + if bitsToBeIncremented == 0 { + // No weird alignment needed + oneFrameObj.ObjData = buf[marker : marker+objLen] + } else { + // 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 + // ^--------^ ^---------------------^ ^---------> + // objLen extension data + // ^ marker + oneFrameObj.ObjData = ShiftByteSliceLeft4Bits(buf[marker : marker+objLen+1]) + } + err = oneFrameObj.Validate() + if err != nil { + return + } + objs = append(objs, oneFrameObj) + + bitsToBeIncremented += 8 * objLen + marker, err = incrementMarker(&bitsToBeIncremented, &byteIncrementCnt, framingElen, curObjIdx) + } + + if bitsToBeIncremented > 0 { + halfByteRemaining = true + } + return +} + +func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte) { + var buffer uint16 + var i int + sliceLen := len(slice) + + if sliceLen < 2 { + // Let's not shift less than 16 bits + return + } + + replacement = make([]byte, sliceLen, cap(slice)) + + for i = 0; i < sliceLen-1; i++ { + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // ^-----^ ^---------------^ ^----------- + // garbage data byte 0 data byte 1 + buffer = uint16(slice[i]) + buffer = buffer << 8 + buffer |= uint16(slice[i+1]) + replacement[i] = uint8(buffer & 0xff0 >> 4) + } + + if i < sliceLen { + lastByte := slice[sliceLen-1] + lastByte = lastByte << 4 + replacement[i] = lastByte + } + return +} + +// The following is used to theoretically support frameInfo ObjID extensions +// for completeness, but they are not very efficient though +func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte) { + var buffer uint16 + var i int + var leftovers uint8 // 4 bits only + var replacementUnit uint16 + var first bool = true + var firstLeftovers uint8 + var lastLeftovers uint8 + sliceLen := len(slice) + + if sliceLen < 2 { + // Let's not shift less than 16 bits + return + } + + if slice[sliceLen-1]&0xf == 0 { + replacement = make([]byte, sliceLen, cap(slice)) + } else { + replacement = make([]byte, sliceLen+1, cap(slice)+1) + } + + for i = 0; i < sliceLen-1; i++ { + buffer = binary.BigEndian.Uint16(slice[i : i+2]) + // (buffer) + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // ^-------------^ ^-------------------^ + // data byte 0 data byte 1 + // + // into + // + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 + // ^-----^ ^---------------^ ^--------------------^ ^----------^ + // zeroes data byte 0 data byte 1 zeroes + + if first { + // The leftover OR'ing will overwrite the first 4 bits of data byte 0. Save them + firstLeftovers = uint8(buffer & 0xf000 >> 12) + first = false + } + replacementUnit = 0 + replacementUnit |= uint16(leftovers) << 12 + replacementUnit |= (buffer & 0xff00) >> 4 // data byte 0 + replacementUnit |= buffer & 0xff >> 4 // data byte 1 first 4 bits + lastLeftovers = uint8(buffer&0xf) << 4 + + replacement[i+1] = byte(replacementUnit) + + leftovers = uint8((buffer & 0x000f) << 4) + } + + replacement[0] = byte(uint8(replacement[0]) | firstLeftovers) + if lastLeftovers > 0 { + replacement[sliceLen] = byte(lastLeftovers) + } + return +} + +func Merge2HalfByteSlices(src1, src2 []byte) (output []byte) { + src1Len := len(src1) + src2Len := len(src2) + output = make([]byte, src1Len+src2Len-1) + + var mergeByte uint8 = src1[src1Len-1] + mergeByte |= uint8(src2[0]) + + copy(output, src1) + copy(output[src1Len:], src2[1:]) + + output[src1Len-1] = byte(mergeByte) + + return +} diff --git a/vendor/github.com/couchbase/gomemcached/mc_constants.go b/vendor/github.com/couchbase/gomemcached/mc_constants.go index 32f4f51852..11f383b8ff 100644 --- a/vendor/github.com/couchbase/gomemcached/mc_constants.go +++ b/vendor/github.com/couchbase/gomemcached/mc_constants.go @@ -6,8 +6,10 @@ import ( ) const ( - REQ_MAGIC = 0x80 - RES_MAGIC = 0x81 + REQ_MAGIC = 0x80 + RES_MAGIC = 0x81 + FLEX_MAGIC = 0x08 + FLEX_RES_MAGIC = 0x18 ) // CommandCode for memcached packets. @@ -99,6 +101,8 @@ const ( SUBDOC_GET = CommandCode(0xc5) // Get subdoc. Returns with xattrs SUBDOC_MULTI_LOOKUP = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta. + DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred + ) // command codes that are counted toward DCP control buffer diff --git a/vendor/github.com/couchbase/gomemcached/mc_req.go b/vendor/github.com/couchbase/gomemcached/mc_req.go index 3ff67ab9a7..35d0fe2daf 100644 --- a/vendor/github.com/couchbase/gomemcached/mc_req.go +++ b/vendor/github.com/couchbase/gomemcached/mc_req.go @@ -25,11 +25,17 @@ type MCRequest struct { Extras, Key, Body, ExtMeta []byte // Datatype identifier DataType uint8 + // len() calls are expensive - cache this in case for collection + Keylen int + // Flexible Framing Extras + FramingExtras []FrameInfo + // Stored length of incoming framing extras + FramingElen int } // Size gives the number of bytes this request requires. func (req *MCRequest) Size() int { - return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) + return HDR_LEN + len(req.Extras) + len(req.Key) + len(req.Body) + len(req.ExtMeta) + req.FramingElen } // A debugging string representation of this request @@ -38,7 +44,23 @@ func (req MCRequest) String() string { req.Opcode, len(req.Body), req.Key) } -func (req *MCRequest) fillHeaderBytes(data []byte) int { +func (req *MCRequest) fillRegularHeaderBytes(data []byte) int { + // Byte/ 0 | 1 | 2 | 3 | + // / | | | | + // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| + // +---------------+---------------+---------------+---------------+ + // 0| Magic | Opcode | Key length | + // +---------------+---------------+---------------+---------------+ + // 4| Extras length | Data type | vbucket id | + // +---------------+---------------+---------------+---------------+ + // 8| Total body length | + // +---------------+---------------+---------------+---------------+ + // 12| Opaque | + // +---------------+---------------+---------------+---------------+ + // 16| CAS | + // | | + // +---------------+---------------+---------------+---------------+ + // Total 24 bytes pos := 0 data[pos] = REQ_MAGIC @@ -84,16 +106,130 @@ func (req *MCRequest) fillHeaderBytes(data []byte) int { copy(data[pos:pos+len(req.Key)], req.Key) pos += len(req.Key) } - return pos } +// Returns pos and if trailing by half byte +func (req *MCRequest) fillFlexHeaderBytes(data []byte) (int, bool) { + + // Byte/ 0 | 1 | 2 | 3 | + // / | | | | + // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| + // +---------------+---------------+---------------+---------------+ + // 0| Magic (0x08) | Opcode | Framing extras| Key Length | + // +---------------+---------------+---------------+---------------+ + // 4| Extras length | Data type | vbucket id | + // +---------------+---------------+---------------+---------------+ + // 8| Total body length | + // +---------------+---------------+---------------+---------------+ + // 12| Opaque | + // +---------------+---------------+---------------+---------------+ + // 16| CAS | + // | | + // +---------------+---------------+---------------+---------------+ + // Total 24 bytes + + data[0] = FLEX_MAGIC + data[1] = byte(req.Opcode) + data[2] = byte(req.FramingElen) + data[3] = byte(req.Keylen) + elen := len(req.Extras) + data[4] = byte(elen) + if req.DataType != 0 { + data[5] = byte(req.DataType) + } + binary.BigEndian.PutUint16(data[6:8], req.VBucket) + binary.BigEndian.PutUint32(data[8:12], + uint32(len(req.Body)+req.Keylen+elen+len(req.ExtMeta)+req.FramingElen)) + binary.BigEndian.PutUint32(data[12:16], req.Opaque) + if req.Cas != 0 { + binary.BigEndian.PutUint64(data[16:24], req.Cas) + } + pos := HDR_LEN + + // Add framing infos + var framingExtras []byte + var outputBytes []byte + var mergeModeSrc []byte + var frameBytes int + var halfByteMode bool + var mergeMode bool + for _, frameInfo := range req.FramingExtras { + if !mergeMode { + outputBytes, halfByteMode = frameInfo.Bytes() + if !halfByteMode { + framingExtras = append(framingExtras, outputBytes...) + frameBytes += len(outputBytes) + } else { + mergeMode = true + mergeModeSrc = outputBytes + } + } else { + outputBytes, halfByteMode = frameInfo.Bytes() + outputBytes := ShiftByteSliceRight4Bits(outputBytes) + if halfByteMode { + // Previous halfbyte merge with this halfbyte will result in a complete byte + mergeMode = false + outputBytes = Merge2HalfByteSlices(mergeModeSrc, outputBytes) + framingExtras = append(framingExtras, outputBytes...) + frameBytes += len(outputBytes) + } else { + // Merge half byte with a non-half byte will result in a combined half-byte that will + // become the source for the next iteration + mergeModeSrc = Merge2HalfByteSlices(mergeModeSrc, outputBytes) + } + } + } + + if mergeMode { + // Commit the temporary merge area into framingExtras + framingExtras = append(framingExtras, mergeModeSrc...) + frameBytes += len(mergeModeSrc) + } + + copy(data[pos:pos+frameBytes], framingExtras) + + pos += frameBytes + + // Add Extras + if len(req.Extras) > 0 { + if mergeMode { + outputBytes = ShiftByteSliceRight4Bits(req.Extras) + data = Merge2HalfByteSlices(data, outputBytes) + } else { + copy(data[pos:pos+elen], req.Extras) + } + pos += elen + } + + // Add keys + if req.Keylen > 0 { + if mergeMode { + outputBytes = ShiftByteSliceRight4Bits(req.Key) + data = Merge2HalfByteSlices(data, outputBytes) + } else { + copy(data[pos:pos+req.Keylen], req.Key) + } + pos += req.Keylen + } + + return pos, mergeMode +} + +func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool) { + if req.FramingElen == 0 { + return req.fillRegularHeaderBytes(data), false + } else { + return req.fillFlexHeaderBytes(data) + } +} + // HeaderBytes will return the wire representation of the request header // (with the extras and key). func (req *MCRequest) HeaderBytes() []byte { - data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)) + data := make([]byte, HDR_LEN+len(req.Extras)+len(req.Key)+req.FramingElen) - req.fillHeaderBytes(data) + req.FillHeaderBytes(data) return data } @@ -102,16 +238,27 @@ func (req *MCRequest) HeaderBytes() []byte { func (req *MCRequest) Bytes() []byte { data := make([]byte, req.Size()) - pos := req.fillHeaderBytes(data) + pos, halfByteMode := req.FillHeaderBytes(data) + // TODO - the halfByteMode should be revisited for a more efficient + // way of doing things if len(req.Body) > 0 { - copy(data[pos:pos+len(req.Body)], req.Body) + if halfByteMode { + shifted := ShiftByteSliceRight4Bits(req.Body) + data = Merge2HalfByteSlices(data, shifted) + } else { + copy(data[pos:pos+len(req.Body)], req.Body) + } } if len(req.ExtMeta) > 0 { - copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta) + if halfByteMode { + shifted := ShiftByteSliceRight4Bits(req.ExtMeta) + data = Merge2HalfByteSlices(data, shifted) + } else { + copy(data[pos+len(req.Body):pos+len(req.Body)+len(req.ExtMeta)], req.ExtMeta) + } } - return data } @@ -130,40 +277,44 @@ func (req *MCRequest) Transmit(w io.Writer) (n int, err error) { return } -// Receive will fill this MCRequest with the data from a reader. -func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { - if len(hdrBytes) < HDR_LEN { - hdrBytes = []byte{ - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0} - } - n, err := io.ReadFull(r, hdrBytes) - if err != nil { - return n, err - } - - if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC { - return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) - } - - klen := int(binary.BigEndian.Uint16(hdrBytes[2:])) - elen := int(hdrBytes[4]) +func (req *MCRequest) receiveHeaderCommon(hdrBytes []byte) (elen, totalBodyLen int) { + elen = int(hdrBytes[4]) // Data type at 5 req.DataType = uint8(hdrBytes[5]) req.Opcode = CommandCode(hdrBytes[1]) // Vbucket at 6:7 req.VBucket = binary.BigEndian.Uint16(hdrBytes[6:]) - totalBodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:])) + totalBodyLen = int(binary.BigEndian.Uint32(hdrBytes[8:])) req.Opaque = binary.BigEndian.Uint32(hdrBytes[12:]) req.Cas = binary.BigEndian.Uint64(hdrBytes[16:]) + return +} +func (req *MCRequest) receiveRegHeader(hdrBytes []byte) (elen, totalBodyLen int) { + elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes) + req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:])) + return +} + +func (req *MCRequest) receiveFlexibleFramingHeader(hdrBytes []byte) (elen, totalBodyLen, framingElen int) { + elen, totalBodyLen = req.receiveHeaderCommon(hdrBytes) + + // For flexible framing header, key length is a single byte at byte index 3 + req.Keylen = int(binary.BigEndian.Uint16(hdrBytes[2:]) & 0x0ff) + // Flexible framing lengh is a single byte at index 2 + framingElen = int(binary.BigEndian.Uint16(hdrBytes[2:]) >> 8) + req.FramingElen = framingElen + return +} + +func (req *MCRequest) populateRegularBody(r io.Reader, totalBodyLen, elen int) (int, error) { + var m int + var err error if totalBodyLen > 0 { buf := make([]byte, totalBodyLen) - m, err := io.ReadFull(r, buf) - n += m + m, err = io.ReadFull(r, buf) if err == nil { if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && @@ -175,7 +326,7 @@ func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { } req.Extras = buf[0:elen] - req.Key = buf[elen : klen+elen] + req.Key = buf[elen : req.Keylen+elen] // get the length of extended metadata extMetaLen := 0 @@ -183,15 +334,149 @@ func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30])) } - bodyLen := totalBodyLen - klen - elen - extMetaLen + bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen if bodyLen > MaxBodyLen { - return n, fmt.Errorf("%d is too big (max %d)", + return m, fmt.Errorf("%d is too big (max %d)", bodyLen, MaxBodyLen) } - req.Body = buf[klen+elen : klen+elen+bodyLen] - req.ExtMeta = buf[klen+elen+bodyLen:] + req.Body = buf[req.Keylen+elen : req.Keylen+elen+bodyLen] + req.ExtMeta = buf[req.Keylen+elen+bodyLen:] } } - return n, err + return m, err +} + +func (req *MCRequest) populateFlexBody(r io.Reader, totalBodyLen, elen, framingElen int) (int, error) { + var m int + var err error + if totalBodyLen > 0 { + buf := make([]byte, totalBodyLen) + m, err = io.ReadFull(r, buf) + if err != nil { + return m, err + } + err = req.populateFlexBodyInternal(buf, totalBodyLen, elen, framingElen) + } + return m, err +} + +func (req *MCRequest) populateFlexBodyInternal(buf []byte, totalBodyLen, elen, framingElen int) error { + var halfByteOffset bool + var err error + if framingElen > 0 { + var objs []FrameInfo + objs, err, halfByteOffset = parseFrameInfoObjects(buf, framingElen) + if err != nil { + return err + } + req.FramingExtras = objs + } + + err = req.populateFlexBodyAfterFrames(buf, totalBodyLen, elen, framingElen, halfByteOffset) + if err != nil { + return err + } + + return nil +} + +func (req *MCRequest) populateFlexBodyAfterFrames(buf []byte, totalBodyLen, elen, framingElen int, halfByteOffset bool) error { + var idxCursor int = framingElen + if req.Opcode >= TAP_MUTATION && req.Opcode <= TAP_CHECKPOINT_END && len(buf[idxCursor:]) > 1 { + // In these commands there is "engine private" + // data at the end of the extras. The first 2 + // bytes of extra data give its length. + if !halfByteOffset { + elen += int(binary.BigEndian.Uint16(buf[idxCursor:])) + } else { + // 0 1 2 3 4 .... 19 20 21 22 ... 32 + // ^-----^ ^-------^ ^------------^ + // offset data do not care + var buffer uint32 = binary.BigEndian.Uint32(buf[idxCursor:]) + buffer &= 0xffff000 + elen += int(buffer >> 12) + } + } + + // Get the extras + if !halfByteOffset { + req.Extras = buf[idxCursor : idxCursor+elen] + } else { + preShift := buf[idxCursor : idxCursor+elen+1] + req.Extras = ShiftByteSliceLeft4Bits(preShift) + } + idxCursor += elen + + // Get the Key + if !halfByteOffset { + req.Key = buf[idxCursor : idxCursor+req.Keylen] + } else { + preShift := buf[idxCursor : idxCursor+req.Keylen+1] + req.Key = ShiftByteSliceLeft4Bits(preShift) + } + idxCursor += req.Keylen + + // get the length of extended metadata + extMetaLen := 0 + if elen > 29 { + extMetaLen = int(binary.BigEndian.Uint16(req.Extras[28:30])) + } + idxCursor += extMetaLen + + bodyLen := totalBodyLen - req.Keylen - elen - extMetaLen - framingElen + if bodyLen > MaxBodyLen { + return fmt.Errorf("%d is too big (max %d)", + bodyLen, MaxBodyLen) + } + + if !halfByteOffset { + req.Body = buf[idxCursor : idxCursor+bodyLen] + idxCursor += bodyLen + } else { + preShift := buf[idxCursor : idxCursor+bodyLen+1] + req.Body = ShiftByteSliceLeft4Bits(preShift) + idxCursor += bodyLen + } + + if extMetaLen > 0 { + if !halfByteOffset { + req.ExtMeta = buf[idxCursor:] + } else { + preShift := buf[idxCursor:] + req.ExtMeta = ShiftByteSliceLeft4Bits(preShift) + } + } + + return nil +} + +// Receive will fill this MCRequest with the data from a reader. +func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error) { + if len(hdrBytes) < HDR_LEN { + hdrBytes = []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0} + } + n, err := io.ReadFull(r, hdrBytes) + if err != nil { + fmt.Printf("Err %v\n", err) + return n, err + } + + switch hdrBytes[0] { + case RES_MAGIC: + fallthrough + case REQ_MAGIC: + elen, totalBodyLen := req.receiveRegHeader(hdrBytes) + bodyRead, err := req.populateRegularBody(r, totalBodyLen, elen) + return n + bodyRead, err + case FLEX_MAGIC: + elen, totalBodyLen, framingElen := req.receiveFlexibleFramingHeader(hdrBytes) + bodyRead, err := req.populateFlexBody(r, totalBodyLen, elen, framingElen) + return n + bodyRead, err + default: + return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0]) + } } diff --git a/vendor/github.com/couchbase/gomemcached/mc_res.go b/vendor/github.com/couchbase/gomemcached/mc_res.go index 2b4cfe1349..f6be989847 100644 --- a/vendor/github.com/couchbase/gomemcached/mc_res.go +++ b/vendor/github.com/couchbase/gomemcached/mc_res.go @@ -153,6 +153,13 @@ func (res *MCResponse) Transmit(w io.Writer) (n int, err error) { // Receive will fill this MCResponse with the data from this reader. func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) { + return res.ReceiveWithBuf(r, hdrBytes, nil) +} + +// ReceiveWithBuf takes an optional pre-allocated []byte buf which +// will be used if its capacity is large enough, otherwise a new +// []byte slice is allocated. +func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error) { if len(hdrBytes) < HDR_LEN { hdrBytes = []byte{ 0, 0, 0, 0, 0, 0, 0, 0, @@ -187,7 +194,13 @@ func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) } }() - buf := make([]byte, klen+elen+bodyLen) + bufNeed := klen + elen + bodyLen + if buf != nil && cap(buf) >= bufNeed { + buf = buf[0:bufNeed] + } else { + buf = make([]byte, bufNeed) + } + m, err := io.ReadFull(r, buf) if err == nil { res.Extras = buf[0:elen] diff --git a/vendor/github.com/couchbase/goutils/LICENSE.md b/vendor/github.com/couchbase/goutils/LICENSE.md index a572e246e6..e06d208186 100644 --- a/vendor/github.com/couchbase/goutils/LICENSE.md +++ b/vendor/github.com/couchbase/goutils/LICENSE.md @@ -1,47 +1,202 @@ -COUCHBASE INC. COMMUNITY EDITION LICENSE AGREEMENT +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ -IMPORTANT-READ CAREFULLY: BY CLICKING THE "I ACCEPT" BOX OR INSTALLING, -DOWNLOADING OR OTHERWISE USING THIS SOFTWARE AND ANY ASSOCIATED -DOCUMENTATION, YOU, ON BEHALF OF YOURSELF OR AS AN AUTHORIZED -REPRESENTATIVE ON BEHALF OF AN ENTITY ("LICENSEE") AGREE TO ALL THE -TERMS OF THIS COMMUNITY EDITION LICENSE AGREEMENT (THE "AGREEMENT") -REGARDING YOUR USE OF THE SOFTWARE. YOU REPRESENT AND WARRANT THAT YOU -HAVE FULL LEGAL AUTHORITY TO BIND THE LICENSEE TO THIS AGREEMENT. IF YOU -DO NOT AGREE WITH ALL OF THESE TERMS, DO NOT SELECT THE "I ACCEPT" BOX -AND DO NOT INSTALL, DOWNLOAD OR OTHERWISE USE THE SOFTWARE. THE -EFFECTIVE DATE OF THIS AGREEMENT IS THE DATE ON WHICH YOU CLICK "I -ACCEPT" OR OTHERWISE INSTALL, DOWNLOAD OR USE THE SOFTWARE. + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION -1. License Grant. Couchbase Inc. hereby grants Licensee, free of charge, -the non-exclusive right to use, copy, merge, publish, distribute, -sublicense, and/or sell copies of the Software, and to permit persons to -whom the Software is furnished to do so, subject to Licensee including -the following copyright notice in all copies or substantial portions of -the Software: + 1. Definitions. -Couchbase (r) http://www.Couchbase.com Copyright 2016 Couchbase, Inc. + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. -As used in this Agreement, "Software" means the object code version of -the applicable elastic data management server software provided by -Couchbase Inc. + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. -2. Restrictions. Licensee will not reverse engineer, disassemble, or -decompile the Software (except to the extent such restrictions are -prohibited by law). + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. -3. Support. Couchbase, Inc. will provide Licensee with access to, and -use of, the Couchbase, Inc. support forum available at the following -URL: http://www.couchbase.org/forums/. Couchbase, Inc. may, at its -discretion, modify, suspend or terminate support at any time upon notice -to Licensee. + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. -4. Warranty Disclaimer and Limitation of Liability. THE SOFTWARE IS -PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, -INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -COUCHBASE INC. OR THE AUTHORS OR COPYRIGHT HOLDERS IN THE SOFTWARE BE -LIABLE FOR ANY CLAIM, DAMAGES (IINCLUDING, WITHOUT LIMITATION, DIRECT, -INDIRECT OR CONSEQUENTIAL DAMAGES) OR OTHER LIABILITY, WHETHER IN AN -ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/modules.txt b/vendor/modules.txt index 92657bb6b5..920dbf8640 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -99,10 +99,10 @@ github.com/boombuler/barcode/qr github.com/boombuler/barcode/utils # github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 github.com/bradfitz/gomemcache/memcache -# github.com/couchbase/gomemcached v0.0.0-20190515232915-c4b4ca0eb21d +# github.com/couchbase/gomemcached v0.0.0-20191004160342-7b5da2ec40b2 github.com/couchbase/gomemcached github.com/couchbase/gomemcached/client -# github.com/couchbase/goutils v0.0.0-20190315194238-f9d42b11473b +# github.com/couchbase/goutils v0.0.0-20191018232750-b49639060d85 github.com/couchbase/goutils/logging github.com/couchbase/goutils/scramsha # github.com/couchbase/vellum v0.0.0-20190829182332-ef2e028c01fd