This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 213
/
postgres.go
68 lines (55 loc) · 1.56 KB
/
postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package postgres
import (
"sync"
"github.com/compose/transporter/adaptor"
"github.com/compose/transporter/client"
_ "github.com/lib/pq" // import pq driver
)
const (
description = "a postgres adaptor that functions as both a source and a sink"
sampleConfig = `{
"uri": "${POSTGRESQL_URI}"
// "debug": false,
// "tail": false,
// "replication_slot": "slot"
}`
)
var (
_ adaptor.Adaptor = &postgres{}
)
// Postgres is an adaptor to read / write to postgres.
// it works as a source by copying files, and then optionally tailing the oplog
type postgres struct {
adaptor.BaseConfig
Debug bool `json:"debug" doc:"display debug information"`
Tail bool `json:"tail" doc:"if tail is true, then the postgres source will tail the oplog after copying the namespace"`
ReplicationSlot string `json:"replication_slot" doc:"required if tail is true; sets the replication slot to use for logical decoding"`
}
func init() {
adaptor.Add(
"postgres",
func() adaptor.Adaptor {
return &postgres{}
},
)
}
func (p *postgres) Client() (client.Client, error) {
return NewClient(WithURI(p.URI))
}
func (p *postgres) Reader() (client.Reader, error) {
if p.Tail {
return newTailer(p.ReplicationSlot), nil
}
return newReader(), nil
}
func (p *postgres) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) {
return newWriter(), nil
}
// Description for postgres adaptor
func (p *postgres) Description() string {
return description
}
// SampleConfig for postgres adaptor
func (p *postgres) SampleConfig() string {
return sampleConfig
}