Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cobra-ify daemon command and remove panics #8

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.2.2"

[[constraint]]
name = "github.com/spf13/cobra"
version = "0.0.3"

[[constraint]]
name = "github.com/pkg/errors"
version = "0.8.0"
19 changes: 8 additions & 11 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/db"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -51,40 +52,37 @@ type Processor struct {
}

// NewProcessor creates a new blockchain processor
func NewProcessor() Processor {
func NewProcessor() (Processor, error) {
// TODO(aiden) accept configuration as a parameter

p := Processor{
jobCompletionQueue: make(chan *jobInfo, 1000),
}

if !config.GetBool(config.BlockchainEnabledKey) {
return p
return p, nil
}

// Setup ethereum client
if client, err := rpc.Dial(config.GetString(config.EthereumJsonRpcEndpointKey)); err != nil {
// TODO(ai): return (processor, error) instead of panic
log.WithError(err).Panic("error creating rpc client")
return p, errors.Wrap(err, "error creating RPC client")
} else {
p.rawClient = client
p.ethClient = ethclient.NewClient(client)
}

// Setup agent
agentAddress := common.HexToAddress(config.GetString(config.AgentContractAddressKey))

// Setup agent
if a, err := NewAgent(agentAddress, p.ethClient); err != nil {
// TODO(ai): remove panic
log.WithError(err).Panic("error instantiating agent")
return p, errors.Wrap(err, "error instantiating agent")
} else {
p.agent = a
}

// Determine "version" of agent contract and set local signature hash creator
if bytecode, err := p.ethClient.CodeAt(context.Background(), agentAddress, nil); err != nil {
log.WithError(err).Panic("error retrieving agent bytecode")
return p, errors.Wrap(err, "error retrieving agent bytecode")
} else {
bcSum := md5.Sum(bytecode)

Expand All @@ -104,8 +102,7 @@ func NewProcessor() Processor {
// Setup identity
if privateKeyString := config.GetString(config.PrivateKeyKey); privateKeyString != "" {
if privKey, err := crypto.HexToECDSA(privateKeyString); err != nil {
// TODO(ai): remove panic
log.WithError(err).Panic("error getting private key")
return p, errors.Wrap(err, "error getting private key")
} else {
p.privateKey = privKey
p.address = crypto.PubkeyToAddress(p.privateKey.PublicKey).Hex()
Expand All @@ -119,7 +116,7 @@ func NewProcessor() Processor {
}
}

return p
return p, nil
}

func (p Processor) GrpcStreamInterceptor() grpc.StreamServerInterceptor {
Expand Down
16 changes: 10 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"errors"
"fmt"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -56,36 +58,38 @@ func init() {
}
}

func Validate() {
func Validate() error {
switch dType := vip.GetString(DaemonTypeKey); dType {
case "grpc":
switch sType := vip.GetString(ServiceTypeKey); sType {
case "grpc":
case "jsonrpc":
case "process":
if vip.GetString(ExecutablePathKey) == "" {
log.Panic("EXECUTABLE required with SERVICE_TYPE 'process'")
return errors.New("EXECUTABLE required with SERVICE_TYPE 'process'")
}
default:
log.Panicf("unrecognized SERVICE_TYPE '%+v'", sType)
return fmt.Errorf("unrecognized SERVICE_TYPE '%+v'", sType)
}

switch enc := vip.GetString(WireEncodingKey); enc {
case "proto":
case "json":
default:
log.Panicf("unrecognized WIRE_ENCODING '%+v'", enc)
return fmt.Errorf("unrecognized WIRE_ENCODING '%+v'", enc)
}
case "http":
default:
log.Panicf("unrecognized DAEMON_TYPE '%+v'", dType)
return fmt.Errorf("unrecognized DAEMON_TYPE '%+v'", dType)
}

if vip.GetBool(BlockchainEnabledKey) {
if vip.GetString(PrivateKeyKey) == "" && vip.GetString(HdwalletMnemonicKey) == "" {
log.Panic("either PRIVATE_KEY or HDWALLET_MNEMONIC are required")
return errors.New("either PRIVATE_KEY or HDWALLET_MNEMONIC are required")
}
}

return nil
}

func GetString(key string) string {
Expand Down
2 changes: 1 addition & 1 deletion resources/test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestEndToEnd(t *testing.T) {

require.NoError(t, runCommand("", nil, "go", "build", "-o",
filepath.Join(buildPath, "snetd"),
filepath.Join(cwd, "..", "..", "snetd", "snetd.go"),
filepath.Join(cwd, "..", "..", "snetd", "main.go"),
).Wait(), "Unable to build snetd")

runCommand(blockchainPath, nil, "npm", "run", "create-mnemonic").Wait()
Expand Down
115 changes: 115 additions & 0 deletions snetd/cmd/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package cmd

import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"

"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/blockchain"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/db"
"github.com/singnet/snet-daemon/handler"
log "github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)

var ServeCmd = &cobra.Command{
Use: "snetd",
Run: func(cmd *cobra.Command, args []string) {
d, err := newDaemon()
if err != nil {
log.WithError(err).Error("Unable to initialize daemon")
os.Exit(2)
}

d.start()
defer d.stop()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
<-sigChan

log.Debug("exiting")
},
}

type daemon struct {
grpcServer *grpc.Server
blockProc blockchain.Processor
lis net.Listener
}

func newDaemon() (daemon, error) {
d := daemon{}

if err := config.Validate(); err != nil {
return d, err
}

var err error
d.lis, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%+v",
config.GetInt(config.DaemonListeningPortKey)))
if err != nil {
return d, errors.Wrap(err, "error listening")
}

d.blockProc, err = blockchain.NewProcessor()
if err != nil {
return d, errors.Wrap(err, "unable to initialize blockchain processor")
}

return d, nil
}

func (d daemon) start() {
d.blockProc.StartLoop()

if config.GetString(config.DaemonTypeKey) == "grpc" {
mux := cmux.New(d.lis)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type",
"application/grpc"))
httpL := mux.Match(cmux.HTTP1Fast())

d.grpcServer = grpc.NewServer(grpc.UnknownServiceHandler(handler.GetGrpcHandler()),
grpc.StreamInterceptor(d.blockProc.GrpcStreamInterceptor()))
grpcWebServer := grpcweb.WrapServer(d.grpcServer)

log.Debug("starting daemon")

go d.grpcServer.Serve(grpcL)
go http.Serve(httpL, http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
if grpcWebServer.IsGrpcWebRequest(req) {
grpcWebServer.ServeHTTP(resp, req)
} else {
if strings.Split(req.URL.Path, "/")[1] == "encoding" {
fmt.Fprint(resp, config.GetString(config.WireEncodingKey))
} else {
http.NotFound(resp, req)
}
}
}))
go mux.Serve()
} else {
log.Debug("starting daemon")

go http.Serve(d.lis, handler.GetHttpHandler(d.blockProc))
}
}

func (d daemon) stop() {
db.Shutdown()

if d.grpcServer != nil {
d.grpcServer.Stop()
}

// TODO(aiden) add d.blockProc.StopLoop()
}
19 changes: 19 additions & 0 deletions snetd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"os"

"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/snetd/cmd"

log "github.com/sirupsen/logrus"
)

func main() {
log.SetLevel(log.Level(config.GetInt(config.LogLevelKey)))

if err := cmd.ServeCmd.Execute(); err != nil {
log.WithError(err).Error("Unable to serve")
os.Exit(1)
}
}
Loading