2022-10-19 21:32:34 +08:00

701 lines
22 KiB

// Copyright (C) MongoDB, Inc. 2017-present.
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongo
import (
var (
// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token.
ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
// ErrNilCursor indicates that the underlying cursor for the change stream is nil.
ErrNilCursor = errors.New("cursor is nil")
minResumableLabelWireVersion int32 = 9 // Wire version at which the server includes the resumable error label
networkErrorLabel = "NetworkError"
resumableErrorLabel = "ResumableChangeStreamError"
errorCursorNotFound int32 = 43 // CursorNotFound error code
// Allowlist of error codes that are considered resumable.
resumableChangeStreamErrors = map[int32]struct{}{
6: {}, // HostUnreachable
7: {}, // HostNotFound
89: {}, // NetworkTimeout
91: {}, // ShutdownInProgress
189: {}, // PrimarySteppedDown
262: {}, // ExceededTimeLimit
9001: {}, // SocketException
10107: {}, // NotPrimary
11600: {}, // InterruptedAtShutdown
11602: {}, // InterruptedDueToReplStateChange
13435: {}, // NotPrimaryNoSecondaryOK
13436: {}, // NotPrimaryOrSecondary
63: {}, // StaleShardVersion
150: {}, // StaleEpoch
13388: {}, // StaleConfig
234: {}, // RetryChangeStream
133: {}, // FailedToSatisfyReadPreference
// ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode
// method or accessed as raw BSON via the Current field. This type is not goroutine safe and must not be used
// concurrently by multiple goroutines. For more information about change streams, see
// https://www.mongodb.com/docs/manual/changeStreams/.
type ChangeStream struct {
// Current is the BSON bytes of the current event. This property is only valid until the next call to Next or
// TryNext. If continued access is required, a copy must be made.
Current bson.Raw
aggregate *operation.Aggregate
pipelineSlice []bsoncore.Document
pipelineOptions map[string]bsoncore.Value
cursor changeStreamCursor
cursorOptions driver.CursorOptions
batch []bsoncore.Document
resumeToken bson.Raw
err error
sess *session.Client
client *Client
registry *bsoncodec.Registry
streamType StreamType
options *options.ChangeStreamOptions
selector description.ServerSelector
operationTime *primitive.Timestamp
wireVersion *description.VersionRange
type changeStreamConfig struct {
readConcern *readconcern.ReadConcern
readPreference *readpref.ReadPref
client *Client
registry *bsoncodec.Registry
streamType StreamType
collectionName string
databaseName string
crypt driver.Crypt
func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
if ctx == nil {
ctx = context.Background()
cs := &ChangeStream{
client: config.client,
registry: config.registry,
streamType: config.streamType,
options: options.MergeChangeStreamOptions(opts...),
selector: description.CompositeSelector([]description.ServerSelector{
cursorOptions: config.client.createBaseCursorOptions(),
cs.sess = sessionFromContext(ctx)
if cs.sess == nil && cs.client.sessionPool != nil {
cs.sess, cs.err = session.NewClientSession(cs.client.sessionPool, cs.client.id, session.Implicit)
if cs.err != nil {
return nil, cs.Err()
if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
return nil, cs.Err()
cs.aggregate = operation.NewAggregate(nil).
if cs.options.Collation != nil {
if comment := cs.options.Comment; comment != nil {
commentVal, err := transformValue(cs.registry, comment, true, "comment")
if err != nil {
return nil, err
cs.cursorOptions.Comment = commentVal
if cs.options.BatchSize != nil {
cs.cursorOptions.BatchSize = *cs.options.BatchSize
if cs.options.MaxAwaitTime != nil {
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
if cs.options.Custom != nil {
// Marshal all custom options before passing to the initial aggregate. Return
// any errors from Marshaling.
customOptions := make(map[string]bsoncore.Value)
for optionName, optionValue := range cs.options.Custom {
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
if err != nil {
cs.err = err
return nil, cs.Err()
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
customOptions[optionName] = optionValueBSON
if cs.options.CustomPipeline != nil {
// Marshal all custom pipeline options before building pipeline slice. Return
// any errors from Marshaling.
cs.pipelineOptions = make(map[string]bsoncore.Value)
for optionName, optionValue := range cs.options.CustomPipeline {
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
if err != nil {
cs.err = err
return nil, cs.Err()
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
cs.pipelineOptions[optionName] = optionValueBSON
switch cs.streamType {
case ClientStream:
case DatabaseStream:
case CollectionStream:
return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
// When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache
// resumeAfter. If neither is set, do not cache a resume token.
resumeToken := cs.options.StartAfter
if resumeToken == nil {
resumeToken = cs.options.ResumeAfter
var marshaledToken bson.Raw
if resumeToken != nil {
if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
return nil, cs.Err()
cs.resumeToken = marshaledToken
if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
return nil, cs.Err()
var pipelineArr bsoncore.Document
pipelineArr, cs.err = cs.pipelineToBSON()
if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
return nil, cs.Err()
return cs, cs.Err()
func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
return &changeStreamDeployment{
topologyKind: cs.client.deployment.Kind(),
server: server,
conn: connection,
func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
var server driver.Server
var conn driver.Connection
var err error
if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
return cs.Err()
if conn, cs.err = server.Connection(ctx); cs.err != nil {
return cs.Err()
defer conn.Close()
cs.wireVersion = conn.Description().WireVersion
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
if resuming {
csOptDoc, err := cs.createPipelineOptionsDoc()
if err != nil {
return err
pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
return cs.Err()
cs.pipelineSlice[0] = pipDoc
var plArr bsoncore.Document
if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
return cs.Err()
// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
// a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution
// and potential retry.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !internal.IsTimeoutContext(ctx) {
newCtx, cancelFunc := internal.MakeTimeoutContext(ctx, *cs.client.timeout)
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
// Cancel the timeout-derived context at the end of executeOperation to avoid a context leak.
defer cancelFunc()
if original := cs.aggregate.Execute(ctx); original != nil {
retryableRead := cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6
if !retryableRead {
cs.err = replaceErrors(original)
return cs.err
cs.err = original
switch tt := original.(type) {
case driver.Error:
if !tt.RetryableRead() {
server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
if err != nil {
conn, err = server.Connection(ctx)
if err != nil {
defer conn.Close()
cs.wireVersion = conn.Description().WireVersion
if cs.wireVersion == nil || cs.wireVersion.Max < 6 {
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
cs.err = cs.aggregate.Execute(ctx)
if cs.err != nil {
cs.err = replaceErrors(cs.err)
return cs.Err()
cs.err = nil
cr := cs.aggregate.ResultCursorResponse()
cr.Server = server
cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
if cs.err = replaceErrors(cs.err); cs.err != nil {
return cs.Err()
if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
cs.emptyBatch() && cs.resumeToken == nil {
cs.operationTime = cs.sess.OperationTime
return cs.Err()
// Updates the post batch resume token after a successful aggregate or getMore operation.
func (cs *ChangeStream) updatePbrtFromCommand() {
// Only cache the pbrt if an empty batch was returned and a pbrt was included
if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
cs.resumeToken = bson.Raw(pbrt)
func (cs *ChangeStream) storeResumeToken() error {
// If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt
// Otherwise, cache the _id of the document
var tokenDoc bson.Raw
if len(cs.batch) == 0 {
if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
tokenDoc = bson.Raw(pbrt)
if tokenDoc == nil {
var ok bool
tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
if !ok {
_ = cs.Close(context.Background())
return ErrMissingResumeToken
cs.resumeToken = tokenDoc
return nil
func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
val := reflect.ValueOf(pipeline)
if !val.IsValid() || !(val.Kind() == reflect.Slice) {
cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got invalid")
return cs.err
cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
csDocTemp, err := cs.createPipelineOptionsDoc()
if err != nil {
return err
csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
if cs.err != nil {
return cs.err
cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
for i := 0; i < val.Len(); i++ {
var elem []byte
elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface(), true, fmt.Sprintf("pipeline stage :%v", i))
if cs.err != nil {
return cs.err
cs.pipelineSlice = append(cs.pipelineSlice, elem)
return cs.err
func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) {
plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
if cs.streamType == ClientStream {
plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
if cs.options.FullDocument != nil {
// Only append a default "fullDocument" field if wire version is less than 6 (3.6). Otherwise,
// the server will assume users want the default behavior, and "fullDocument" does not need to be
// specified.
if *cs.options.FullDocument != options.Default || (cs.wireVersion != nil && cs.wireVersion.Max < 6) {
plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
if cs.options.FullDocumentBeforeChange != nil {
plDoc = bsoncore.AppendStringElement(plDoc, "fullDocumentBeforeChange", string(*cs.options.FullDocumentBeforeChange))
if cs.options.ResumeAfter != nil {
var raDoc bsoncore.Document
raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter, true, "resumeAfter")
if cs.err != nil {
return nil, cs.err
plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
if cs.options.ShowExpandedEvents != nil {
plDoc = bsoncore.AppendBooleanElement(plDoc, "showExpandedEvents", *cs.options.ShowExpandedEvents)
if cs.options.StartAfter != nil {
var saDoc bsoncore.Document
saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter, true, "startAfter")
if cs.err != nil {
return nil, cs.err
plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
if cs.options.StartAtOperationTime != nil {
plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
// Append custom pipeline options.
for optionName, optionValue := range cs.pipelineOptions {
plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
return nil, cs.err
return plDoc, nil
func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
for i, doc := range cs.pipelineSlice {
pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
return nil, cs.err
return pipelineArr, cs.err
func (cs *ChangeStream) replaceOptions(wireVersion *description.VersionRange) {
// Cached resume token: use the resume token as the resumeAfter option and set no other resume options
if cs.resumeToken != nil {
// No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and
// set no other resume options
if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
opTime := cs.options.StartAtOperationTime
if cs.operationTime != nil {
opTime = cs.sess.OperationTime
// No cached resume token or operation time: set none of the resume options
// ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted.
func (cs *ChangeStream) ID() int64 {
if cs.cursor == nil {
return 0
return cs.cursor.ID()
// Decode will unmarshal the current event document into val and return any errors from the unmarshalling process
// without any modification. If val is nil or is a typed nil, an error will be returned.
func (cs *ChangeStream) Decode(val interface{}) error {
if cs.cursor == nil {
return ErrNilCursor
return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
// Err returns the last error seen by the change stream, or nil if no errors has occurred.
func (cs *ChangeStream) Err() error {
if cs.err != nil {
return replaceErrors(cs.err)
if cs.cursor == nil {
return nil
return replaceErrors(cs.cursor.Err())
// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
// called. Close is idempotent. After the first call, any subsequent calls will not change the state.
func (cs *ChangeStream) Close(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
defer closeImplicitSession(cs.sess)
if cs.cursor == nil {
return nil // cursor is already closed
cs.err = replaceErrors(cs.cursor.Close(ctx))
cs.cursor = nil
return cs.Err()
// ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been
// stored.
func (cs *ChangeStream) ResumeToken() bson.Raw {
return cs.resumeToken
// Next gets the next event for this change stream. It returns true if there were no errors and the next event document
// is available.
// Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error
// will be set to ctx.Err(). In an error case, Next will return false.
// If Next returns false, subsequent calls will also return false.
func (cs *ChangeStream) Next(ctx context.Context) bool {
return cs.next(ctx, false)
// TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next
// event document is available.
// TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the
// server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
// If TryNext returns false and an error occurred or the change stream was closed
// (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call
// TryNext again until a change is available.
// This method requires driver version >= 1.2.0.
func (cs *ChangeStream) TryNext(ctx context.Context) bool {
return cs.next(ctx, true)
func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
// return false right away if the change stream has already errored or if cursor is closed.
if cs.err != nil {
return false
if ctx == nil {
ctx = context.Background()
if len(cs.batch) == 0 {
cs.loopNext(ctx, nonBlocking)
if cs.err != nil {
cs.err = replaceErrors(cs.err)
return false
if len(cs.batch) == 0 {
return false
// successfully got non-empty batch
cs.Current = bson.Raw(cs.batch[0])
cs.batch = cs.batch[1:]
if cs.err = cs.storeResumeToken(); cs.err != nil {
return false
return true
func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
for {
if cs.cursor == nil {
if cs.cursor.Next(ctx) {
// non-empty batch returned
cs.batch, cs.err = cs.cursor.Batch().Documents()
cs.err = replaceErrors(cs.cursor.Err())
if cs.err == nil {
// Check if cursor is alive
if cs.ID() == 0 {
// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
// Update the tracked resume token to catch the post batch resume token from the server response.
if nonBlocking {
// stop after a successful getMore, even though the batch was empty
continue // loop getMore until a non-empty batch is returned or an error occurs
if !cs.isResumableError() {
// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
_ = cs.cursor.Close(ctx)
if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
func (cs *ChangeStream) isResumableError() bool {
commandErr, ok := cs.err.(CommandError)
if !ok || commandErr.HasErrorLabel(networkErrorLabel) {
// All non-server errors or network errors are resumable.
return true
if commandErr.Code == errorCursorNotFound {
return true
// For wire versions 9 and above, a server error is resumable if it has the ResumableChangeStreamError label.
if cs.wireVersion != nil && cs.wireVersion.Includes(minResumableLabelWireVersion) {
return commandErr.HasErrorLabel(resumableErrorLabel)
// For wire versions below 9, a server error is resumable if its code is on the allowlist.
_, resumable := resumableChangeStreamErrors[commandErr.Code]
return resumable
// Returns true if the underlying cursor's batch is empty
func (cs *ChangeStream) emptyBatch() bool {
return cs.cursor.Batch().Empty()
// StreamType represents the cluster type against which a ChangeStream was created.
type StreamType uint8
// These constants represent valid change stream types. A change stream can be initialized over a collection, all
// collections in a database, or over a cluster.
const (
CollectionStream StreamType = iota