Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to MaxStalenessSeconds in ReadPreference #271

Open
wants to merge 3 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ const (
// false: Initiate the connection without TLS/SSL.
// The default value is false.
//
// maxStalenessSeconds=<seconds>
//
// specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries, minimum value allowed is 90.
// Works on MongoDB 3.4+
//
// Relevant documentation:
//
// http://docs.mongodb.org/manual/reference/connection-string/
Expand Down Expand Up @@ -353,6 +358,7 @@ func ParseURL(url string) (*DialInfo, error) {
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
maxStalenessSeconds := 0
safe := Safe{}
for _, opt := range uinfo.options {
switch opt.key {
Expand Down Expand Up @@ -390,6 +396,17 @@ func ParseURL(url string) (*DialInfo, error) {
if err != nil {
return nil, errors.New("bad value for maxPoolSize: " + opt.value)
}
case "maxStalenessSeconds":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the documentation maxStalenessSeconds is not compatible with mode primary, so maybe you could add that restriction to the existing check below (line 460).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you please add description of this parameter to the Dial method documentation (i.e. https://godoc.org/github.com/globalsign/mgo#Dial)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @eminano and @szank, could you see my commits updates ?, I committed some changes.

maxStalenessSeconds, err = strconv.Atoi(opt.value)

if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would need the maxStalenessSeconds >= 0 check to compliment the SetMaxStalenessSeconds() behaviour?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @domodwyer, I've added you recommendation.

return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value)
}

if maxStalenessSeconds > 0 && maxStalenessSeconds < 90 {
return nil, errors.New("maxStalenessSeconds too low " + opt.value + ", must be >= 90 seconds")
}

case "appName":
if len(opt.value) > 128 {
return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value)
Expand Down Expand Up @@ -455,6 +472,10 @@ func ParseURL(url string) (*DialInfo, error) {
return nil, errors.New("readPreferenceTagSet may not be specified when readPreference is primary")
}

if readPreferenceMode == Primary && maxStalenessSeconds > 0 {
return nil, errors.New("maxStalenessSeconds may not be specified when readPreference is primary")
}

info := DialInfo{
Addrs: uinfo.addrs,
Direct: direct,
Expand All @@ -469,6 +490,8 @@ func ParseURL(url string) (*DialInfo, error) {
ReadPreference: &ReadPreference{
Mode: readPreferenceMode,
TagSets: readPreferenceTagSets,

MaxStalenessSeconds: maxStalenessSeconds,
},
Safe: safe,
ReplicaSetName: setName,
Expand Down Expand Up @@ -607,6 +630,8 @@ func (i *DialInfo) Copy() *DialInfo {
if i.ReadPreference != nil {
readPreference = &ReadPreference{
Mode: i.ReadPreference.Mode,

MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds,
}
readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets))
copy(readPreference.TagSets, i.ReadPreference.TagSets)
Expand Down Expand Up @@ -679,6 +704,9 @@ type ReadPreference struct {
// Mode determines the consistency of results. See Session.SetMode.
Mode Mode

// MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a comment stating that this option is supported in mongo >= 3.4 only ?

MaxStalenessSeconds int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missing an omitempty tag? I imagine you don't want to send maxStalenessSeconds: 0 with every request if it was not set by the user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @domodwyer, the wire information is sent by socket.go(line: 131), I suppose that tag is not necessary here, thanks.


// TagSets indicates which servers are allowed to be used. See Session.SelectServers.
TagSets []bson.D
}
Expand Down Expand Up @@ -768,6 +796,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) {
if info.ReadPreference != nil {
session.SelectServers(info.ReadPreference.TagSets...)
session.SetMode(info.ReadPreference.Mode, true)
session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds)
} else {
session.SetMode(Strong, true)
}
Expand Down Expand Up @@ -2190,6 +2219,24 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) {
s.m.Unlock()
}

// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries
// You must specify a maxStalenessSeconds value of 90 seconds or longer: specifying a smaller maxStalenessSeconds value will raise an error.
// Works on MongoDB 3.4+
//
// Relevant documentation:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth including that if seconds > 90 it's currently expected to receive an error from mongodb:

You must specify a maxStalenessSeconds value of 90 seconds or longer: specifying a smaller maxStalenessSeconds value will raise an error.

//
// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds
//
func (s *Session) SetMaxStalenessSeconds(seconds int) error {
s.m.Lock()
wpjunior marked this conversation as resolved.
Show resolved Hide resolved
defer s.m.Unlock()
if seconds > 0 && seconds < 90 {
return errors.New("SetMaxStalenessSeconds: minimum of seconds is 90")
}
s.queryConfig.op.maxStalenessSeconds = seconds
return nil
}

// SetBypassValidation sets whether the server should bypass the registered
// validation expressions executed when documents are inserted or modified,
// in the interest of preserving invariants in the collection being modified.
Expand Down
15 changes: 10 additions & 5 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,33 @@ func (s *S) TestURLReadPreference(c *C) {
type test struct {
url string
mode mgo.Mode

maxStalenessSeconds int
}

tests := []test{
{"localhost:40001?readPreference=primary", mgo.Primary},
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred},
{"localhost:40001?readPreference=secondary", mgo.Secondary},
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred},
{"localhost:40001?readPreference=nearest", mgo.Nearest},
{"localhost:40001?readPreference=primary", mgo.Primary, 0},
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0},
{"localhost:40001?readPreference=secondary", mgo.Secondary, 0},
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0},
{"localhost:40001?readPreference=secondary&maxStalenessSeconds=110", mgo.Secondary, 110},
{"localhost:40001?readPreference=nearest", mgo.Nearest, 0},
}

for _, test := range tests {
info, err := mgo.ParseURL(test.url)
c.Assert(err, IsNil)
c.Assert(info.ReadPreference, NotNil)
c.Assert(info.ReadPreference.Mode, Equals, test.mode)
c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds)
}
}

func (s *S) TestURLInvalidReadPreference(c *C) {
urls := []string{
"localhost:40001?readPreference=foo",
"localhost:40001?readPreference=primarypreferred",
"localhost:40001?readPreference=primary&maxStalenessSeconds=90",
}
for _, url := range urls {
_, err := mgo.ParseURL(url)
Expand Down
8 changes: 7 additions & 1 deletion socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type queryOp struct {
hasOptions bool
flags queryOpFlags
readConcern string

maxStalenessSeconds int
}

type queryWrapper struct {
Expand Down Expand Up @@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
}
op.hasOptions = true
op.options.ReadPreference = make(bson.D, 0, 2)
op.options.ReadPreference = make(bson.D, 0, 3)
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName})
if len(op.serverTags) > 0 {
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags})
}

if op.maxStalenessSeconds > 0 {
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds})
}
}
if op.hasOptions {
if op.query == nil {
Expand Down