diff --git a/.gitignore b/.gitignore index 18207d7..d668d31 100755 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +.idea # Test binary, built with `go test -c` *.test diff --git a/cli/api.go b/cli/api.go new file mode 100644 index 0000000..95a1d7c --- /dev/null +++ b/cli/api.go @@ -0,0 +1,38 @@ +package cli + +import ( + "github.com/NethermindEth/posmoni/pkg/api" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "strconv" +) + +var ( + port int64 +) + +// serverCmd represents the api command +var serverCmd = &cobra.Command{ + Use: "api", + Short: "Starts a api to monitor validators of Ethereum Beacon Chain", + Long: ` +Starts a api to monitor Ethereum Beacon Chain validator's balance changes and missed attestations using Beacon Chain official HTTP API. +`, + Run: func(cmd *cobra.Command, args []string) { + ExecuteServer() + }, +} + +func init() { + RootCmd.AddCommand(serverCmd) + + serverCmd.Flags().Int64Var(&port, "port", 12001, "Port to listen to") +} + +func ExecuteServer() { + // Execute api + server := api.Server{} + server.Initialize() + log.Info("Starting server") + server.Run(":" + strconv.FormatInt(port, 10)) +} diff --git a/go.mod b/go.mod index d7b3b5b..02352a8 100755 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.19 require ( github.com/antonfisher/nested-logrus-formatter v1.3.1 github.com/cenkalti/backoff/v4 v4.1.2 + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.0 github.com/r3labs/sse/v2 v2.7.7 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 diff --git a/go.sum b/go.sum index b146fc0..3ec7656 100755 --- a/go.sum +++ b/go.sum @@ -8,6 +8,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= diff --git a/pkg/api/server.go b/pkg/api/server.go new file mode 100644 index 0000000..5dc96f8 --- /dev/null +++ b/pkg/api/server.go @@ -0,0 +1,106 @@ +package api + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "encoding/json" + "github.com/NethermindEth/posmoni/pkg/eth2" + "github.com/NethermindEth/posmoni/pkg/eth2/db" + net "github.com/NethermindEth/posmoni/pkg/eth2/networking" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "net/http" +) + +var ( + upgrader = websocket.Upgrader{} +) + +type Server struct { + Router *mux.Router +} + +// Initialize the Server +func (s *Server) Initialize() { + s.Router = mux.NewRouter() + s.initializeRoutes() +} + +func (s *Server) Run(address string) { + log.Fatal(http.ListenAndServe(address, s.Router)) +} + +type info struct { + ConsensusUrls []string `json:"consensus_urls"` + ExecutionUrls []string `json:"execution_urls"` + Wait time.Duration `json:"wait"` +} + +// trackSync handle incoming request to track sync +func (s *Server) trackSync(w http.ResponseWriter, r *http.Request) { + //// Upgrade upgrades the HTTP server connection to the WebSocket protocol. + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade failed: ", err) + return + } + defer conn.Close() + + _, message, err := conn.ReadMessage() + if err != nil { + return + } + var infoVal info + err = json.Unmarshal(message, &infoVal) + if err != nil { + return + } + monitor, err := eth2.NewEth2Monitor( + db.EmptyRepository{}, + &net.BeaconClient{RetryDuration: time.Minute * 10}, + &net.ExecutionClient{RetryDuration: time.Minute * 10}, + net.SubscribeOpts{}, + eth2.ConfigOpts{ + HandleCfg: false, + Checkers: []eth2.CfgChecker{ + {Key: eth2.Execution, ErrMsg: eth2.NoExecutionFoundError, Data: infoVal.ExecutionUrls}, + {Key: eth2.Consensus, ErrMsg: eth2.NoConsensusFoundError, Data: infoVal.ConsensusUrls}, + }, + }, + ) + if err != nil { + log.Fatal(err) + } + + done := make(chan struct{}) + results := monitor.TrackSync(done, infoVal.ConsensusUrls, infoVal.ExecutionUrls, infoVal.Wait) + go func() { + messageType, _, _ := conn.ReadMessage() + switch messageType { + case websocket.CloseMessage: + done <- struct{}{} + close(done) + } + }() + for r := range results { + if r.Error != nil { + log.Errorf("Endpoint %s returned an error. Error: %v", r.Endpoint, r.Error) + } + msg, err := json.Marshal(r) + if err != nil { + log.Errorf("Error marshaling response: %v", err) + return + } + err = conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Errorf("Error writing message: %v", err) + return + } + } +} + +func (s *Server) initializeRoutes() { + s.Router.HandleFunc("/trackSync", s.trackSync) +} diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go new file mode 100644 index 0000000..778f64e --- /dev/null +++ b/pkg/api/server_test.go @@ -0,0 +1 @@ +package api diff --git a/pkg/eth2/eth2.go b/pkg/eth2/eth2.go index 554f36f..f34cbb5 100755 --- a/pkg/eth2/eth2.go +++ b/pkg/eth2/eth2.go @@ -255,6 +255,7 @@ func (e *eth2Monitor) TrackSync(done <-chan struct{}, beaconEndpoints, execution select { case <-done: close(c) + log.WithFields(logFields).Info("TrackSync done") return case <-time.After(w): if w == 0 {