Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative support for Mongo Transactions in v4 #334

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bson/bson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *S) TestPtrInline(c *C) {

// deeper struct with inline pointer
{
in := InlineG2{G2: 15, InlineG1: &InlineG1{G1:16, Final: &Final{G0: 23}}}
in := InlineG2{G2: 15, InlineG1: &InlineG1{G1: 16, Final: &Final{G0: 23}}}
c.Assert(in.InlineG1, NotNil)
c.Assert(in.Final, NotNil)

Expand Down Expand Up @@ -1272,11 +1272,11 @@ type Final struct {
G0 int `bson:"g0,omitempty"`
}
type InlineG1 struct {
G1 int `bson:"g1,omitempty"`
G1 int `bson:"g1,omitempty"`
*Final `bson:",inline"`
}
type InlineG2 struct {
G2 int `bson:"g2,omitempty"`
G2 int `bson:"g2,omitempty"`
*InlineG1 `bson:",inline"`
}

Expand Down
197 changes: 183 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,25 @@ type Session struct {
bypassValidation bool
slaveOk bool

dialInfo *DialInfo
dialInfo *DialInfo
sessionId bson.Binary
nextTransactionNumber int64
transaction *transaction
}

type sessionId struct {
Id bson.Binary `bson:"id"`
}

type sessionInfo struct {
Id sessionId `bson:"id"`
TimeoutMinutes int `bson:"timeoutMinutes"`
}

type transaction struct {
number int64
sessionId bson.Binary
started bool
}

// Database holds collections of documents
Expand Down Expand Up @@ -2035,12 +2053,23 @@ func (s *Session) Clone() *Session {
// Close terminates the session. It's a runtime error to use a session
// after it has been closed.
func (s *Session) Close() {
s.m.RLock()
txnActive := s.transaction != nil && s.transaction.started
s.m.RUnlock()
if txnActive {
// No chance to give the user an error
err := s.AbortTransaction()
if err != nil {
logf("abort during Session.Close failed: %v", err)
}
}
s.m.Lock()
if s.mgoCluster != nil {
debugf("Closing session %p", s)
s.unsetSocket()
s.mgoCluster.Release()
s.mgoCluster = nil
s.transaction = nil
}
s.m.Unlock()
}
Expand Down Expand Up @@ -3842,6 +3871,10 @@ func (q *Query) One(result interface{}) (err error) {
session := q.session
op := q.op // Copy.
q.m.Unlock()
session.m.RLock()
txn := session.transaction
startTxn := txn != nil && !txn.started
session.m.RUnlock()

socket, err := session.acquireSocket(true)
if err != nil {
Expand All @@ -3853,12 +3886,17 @@ func (q *Query) One(result interface{}) (err error) {

session.prepareQuery(&op)

expectFindReply := prepareFindOp(socket, &op, 1)
expectFindReply := prepareFindOp(socket, &op, 1, txn, startTxn)

data, err := socket.SimpleQuery(&op)
if err != nil {
return err
}
if startTxn {
session.m.Lock()
txn.started = true
session.m.Unlock()
}
if data == nil {
return ErrNotFound
}
Expand Down Expand Up @@ -3897,7 +3935,7 @@ func (q *Query) One(result interface{}) (err error) {
// a new-style find command if that's supported by the MongoDB server (3.2+).
// It returns whether to expect a find command result or not. Note op may be
// translated into an explain command, in which case the function returns false.
func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32, txn *transaction, startTxn bool) bool {
if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" {
return false
}
Expand Down Expand Up @@ -3936,6 +3974,16 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
find.BatchSize = op.limit
}

if txn != nil {
if startTxn {
find.StartTransaction = true
}
find.TXNNumber = txn.number
find.LSID = bson.D{{Name: "id", Value: txn.sessionId}}
autocommit := false
find.Autocommit = &autocommit
}

explain := op.options.Explain

op.collection = op.collection[:nameDot] + ".$cmd"
Expand Down Expand Up @@ -3990,6 +4038,10 @@ type findCmd struct {
NoCursorTimeout bool `bson:"noCursorTimeout,omitempty"`
AllowPartialResults bool `bson:"allowPartialResults,omitempty"`
Collation *Collation `bson:"collation,omitempty"`
LSID bson.D `bson:"lsid,omitempty"`
TXNNumber int64 `bson:"txnNumber,omitempty"`
Autocommit *bool `bson:"autocommit,omitempty"`
StartTransaction bool `bson:"startTransaction,omitempty"`
}

// readLevel provides the nested "level: majority" serialisation needed for the
Expand Down Expand Up @@ -4204,6 +4256,10 @@ func (q *Query) Iter() *Iter {
prefetch := q.prefetch
limit := q.limit
q.m.Unlock()
session.m.RLock()
txn := session.transaction
startTxn := txn != nil && !txn.started
session.m.RUnlock()

iter := &Iter{
session: session,
Expand All @@ -4227,7 +4283,7 @@ func (q *Query) Iter() *Iter {
session.prepareQuery(&op)
op.replyFunc = iter.op.replyFunc

if prepareFindOp(socket, &op, limit) {
if prepareFindOp(socket, &op, limit, txn, startTxn) {
iter.isFindCmd = true
}

Expand All @@ -4239,6 +4295,11 @@ func (q *Query) Iter() *Iter {
iter.err = err
iter.m.Unlock()
}
if startTxn {
session.m.Lock()
txn.started = true
session.m.Unlock()
}

return iter
}
Expand Down Expand Up @@ -5475,6 +5536,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
s.m.RLock()
safeOp := s.safeOp
bypassValidation := s.bypassValidation
txn := s.transaction
s.m.RUnlock()

if socket.ServerInfo().MaxWireVersion >= 2 {
Expand All @@ -5490,7 +5552,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
l = len(all)
}
op.documents = all[i:l]
oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation, txn)
lerr.N += oplerr.N
lerr.modified += oplerr.modified
if err != nil {
Expand Down Expand Up @@ -5518,7 +5580,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
l = len(updateOp)
}

oplerr, err := c.writeOpCommand(socket, safeOp, updateOp[i:l], ordered, bypassValidation)
oplerr, err := c.writeOpCommand(socket, safeOp, updateOp[i:l], ordered, bypassValidation, txn)

lerr.N += oplerr.N
lerr.modified += oplerr.modified
Expand All @@ -5544,7 +5606,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
l = len(deleteOps)
}

oplerr, err := c.writeOpCommand(socket, safeOp, deleteOps[i:l], ordered, bypassValidation)
oplerr, err := c.writeOpCommand(socket, safeOp, deleteOps[i:l], ordered, bypassValidation, txn)

lerr.N += oplerr.N
lerr.modified += oplerr.modified
Expand All @@ -5560,7 +5622,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
}
return &lerr, nil
}
return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation, txn)
} else if updateOps, ok := op.(bulkUpdateOp); ok {
var lerr LastError
for i, updateOp := range updateOps {
Expand Down Expand Up @@ -5645,7 +5707,7 @@ func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op inter
return result, nil
}

func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) {
func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool, txn *transaction) (lerr *LastError, err error) {
var writeConcern interface{}
if safeOp == nil {
writeConcern = bson.D{{Name: "w", Value: 0}}
Expand All @@ -5660,45 +5722,60 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int
cmd = bson.D{
{Name: "insert", Value: c.Name},
{Name: "documents", Value: op.documents},
{Name: "writeConcern", Value: writeConcern},
{Name: "ordered", Value: op.flags&1 == 0},
}
case *updateOp:
// http://docs.mongodb.org/manual/reference/command/update
cmd = bson.D{
{Name: "update", Value: c.Name},
{Name: "updates", Value: []interface{}{op}},
{Name: "writeConcern", Value: writeConcern},
{Name: "ordered", Value: ordered},
}
case bulkUpdateOp:
// http://docs.mongodb.org/manual/reference/command/update
cmd = bson.D{
{Name: "update", Value: c.Name},
{Name: "updates", Value: op},
{Name: "writeConcern", Value: writeConcern},
{Name: "ordered", Value: ordered},
}
case *deleteOp:
// http://docs.mongodb.org/manual/reference/command/delete
cmd = bson.D{
{Name: "delete", Value: c.Name},
{Name: "deletes", Value: []interface{}{op}},
{Name: "writeConcern", Value: writeConcern},
{Name: "ordered", Value: ordered},
}
case bulkDeleteOp:
// http://docs.mongodb.org/manual/reference/command/delete
cmd = bson.D{
{Name: "delete", Value: c.Name},
{Name: "deletes", Value: op},
{Name: "writeConcern", Value: writeConcern},
{Name: "ordered", Value: ordered},
}
}
if bypassValidation {
cmd = append(cmd, bson.DocElem{Name: "bypassDocumentValidation", Value: true})
}
started := false
if txn != nil {
c.Database.Session.m.RLock()
if !txn.started {
cmd = append(cmd, bson.DocElem{Name: "startTransaction", Value: true})
started = true
}
c.Database.Session.m.RUnlock()
cmd = append(cmd, bson.DocElem{Name: "autocommit", Value: false})
cmd = append(cmd, bson.DocElem{Name: "txnNumber", Value: txn.number})
cmd = append(cmd, bson.DocElem{Name: "lsid", Value: bson.M{"id": txn.sessionId}})
} else {
cmd = append(cmd, bson.DocElem{Name: "writeConcern", Value: writeConcern})
}

if started {
c.Database.Session.m.Lock()
txn.started = started
c.Database.Session.m.Unlock()
}

var result writeCmdResult
err = c.Database.run(socket, cmd, &result)
Expand Down Expand Up @@ -5811,3 +5888,95 @@ func rdnOIDToShortName(oid asn1.ObjectIdentifier) string {

return ""
}

func (s *Session) ensureSessionId() error {
s.m.RLock()
if len(s.sessionId.Data) != 0 {
s.m.RUnlock()
return nil
}
s.m.RUnlock()
var info sessionInfo
// TODO(jam): 2019-02-27 the startSession call can take a few optional parameters.
// We could put them as Session attributes that we pass along. It seems to be
// things like 'casual consistency' and 'write preference', which we seem to be
// setting elsewhere.

err := s.Run("startSession", &info)
if err != nil {
return err
}
s.m.Lock()
s.sessionId = info.Id.Id
s.m.Unlock()
return nil
}

func (s *Session) StartTransaction() error {
if err := s.ensureSessionId(); err != nil {
return err
}
s.m.Lock()
if s.transaction != nil {
s.m.Unlock()
return errors.New("transaction already started")
}
s.nextTransactionNumber++
s.transaction = &transaction{
number: s.nextTransactionNumber,
sessionId: s.sessionId,
started: false,
}
s.m.Unlock()
// TODO: readConcern, writeConcern, readPreference can all be set separately for a given transaction
return nil
}

func (s *Session) finishTransaction(command string) error {
s.m.RLock()
if len(s.sessionId.Data) == 0 {
s.m.RUnlock()
return errors.New("no transaction in progress")
}
if s.transaction == nil {
s.m.RUnlock()
return errors.New("no transaction in progress")
}
txn := s.transaction
sessionId := s.sessionId
txnNumber := txn.number
started := txn.started
s.m.RUnlock()
var err error
if started {
// XXX: Python has a retry tracking 'retryable' errors around finishTransaction
cmd := bson.D{
{Name: command, Value: 1},
{Name: "txnNumber", Value: txnNumber},
{Name: "autocommit", Value: false},
{Name: "lsid", Value: bson.M{"id": sessionId}},
}
err = s.Run(cmd, nil)
}
s.m.Lock()
if s.transaction == txn {
s.transaction = nil
} else {
// TODO: How to exercise this code?
err = errors.New(fmt.Sprintf("transaction changed during %s, %v != %v",
command, txn, s.transaction))
}
s.m.Unlock()
if err != nil {
return err
}
return nil
}

func (s *Session) CommitTransaction() error {
return s.finishTransaction("commitTransaction")
}

func (s *Session) AbortTransaction() error {
return s.finishTransaction("abortTransaction")
}
Loading