diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a975fad --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,11 @@ +--- +- repo: git://github.com/dnephin/pre-commit-golang + sha: HEAD + hooks: + - id: go-fmt + - id: go-lint + +- repo: git://github.com/adrienverge/yamllint + sha: HEAD + hooks: + - id: yamllint diff --git a/README.md b/README.md new file mode 100644 index 0000000..18d7f8f --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ +# pgterminate +> Terminates active and idle PostgreSQL backends + +Did you encountered long-running queries locking down your entire company system because of a massive lock on the database? Or a scatterbrained developper connected to the production database with an open transaction leading to a production outage? Never? Really? Either you have very good policies and that's awesome, or you don't work in databases at all. + +With `pgterminate`, you shouldn't be paged at night because some queries has locked down production for too long. It looks after "active" and "idle" connections and terminate them. As simple as that. + +# Highlights +* `pgterminate` name is derived from `pg_terminate_backend` function, it terminates backends. +* backends are called sessions in `pgterminate`. +* `active` sessions are backends in `active` state for more than `active-timeout` seconds. +* `idle` sessions are backends in `idle`, `idle in transaction` or `idle in transaction (abort)` state for more than `idle-timeout` seconds. +* at least one of `active-timeout` and `idle-timeout` parameter is required, both can be used. +* `pgterminate` relies on `libpq` for PostgreSQL connection. When `-host` is ommited, connection via unix socket is used. When `-user` is ommited, the unix user is used. And so on. +* time parameters, like `connect-timeout`, `active-timeout`, `idle-timeout` and `interval`, are represented in seconds. They accept float value except for `connect-timeout` which is an integer. +* if you want `pgterminate` to terminate any session, ensure it has SUPERUSER privileges. + +# Internals + +## Signals +`pgterminate` handles the following OS signals: +* `SIGINT`, `SIGTERM` to gracefully terminates the infinite loop +* `SIGHUP` to reload configuration file and re-open log file if used (handy for logrotate) + +## Configuration +There's two ways to configure `pgterminate`: +* command-line arguments +* configuration file with `-config` command-line argument + +Configuration file options **override** command-line arguments + +# Usage +Connect to a remote instance and prompt for password: +``` +pgterminate -host 10.0.0.1 -port 5432 -user test -prompt-password -database test +``` +Use a configuration file: +``` +pgterminate -config config.yaml +``` +Use both configuration file and command-line arguments: +``` +pgterminate -config config.yaml -interval 0.25 -active-timeout 10 -idle-timeout 300 +``` +Print usage: +``` +pgterminate -help +``` + +# License +`pgterminate` is released under [The Unlicense](https://github.com/jouir/pgterminate/blob/master/LICENSE) license. Code is under public domain. diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..6c6aa7c --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1.0 \ No newline at end of file diff --git a/base/config.go b/base/config.go new file mode 100644 index 0000000..82fd21c --- /dev/null +++ b/base/config.go @@ -0,0 +1,97 @@ +package base + +import ( + "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" + "log" + "path/filepath" + "strings" + "sync" +) + +// AppName exposes application name to config module +var AppName string + +// Config receives configuration options +type Config struct { + mutex sync.Mutex + File string + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + Database string `yaml:"database"` + Interval float64 `yaml:"interval"` + ConnectTimeout int `yaml:"connect-timeout"` + IdleTimeout float64 `yaml:"idle-timeout"` + ActiveTimeout float64 `yaml:"active-timeout"` + LogFile string `yaml:"log-file"` + PidFile string `yaml:"pid-file"` +} + +func init() { + AppName = "pgterminate" +} + +// NewConfig creates a Config object +func NewConfig() *Config { + return &Config{} +} + +// Read loads options from a configuration file to Config +func (c *Config) Read(file string) error { + file, err := filepath.Abs(file) + if err != nil { + return err + } + + yamlFile, err := ioutil.ReadFile(file) + if err != nil { + return err + } + + err = yaml.Unmarshal(yamlFile, &c) + if err != nil { + return err + } + + return nil +} + +// Reload reads from file and update configuration +func (c *Config) Reload() { + log.Println("Reloading configuration") + c.mutex.Lock() + defer c.mutex.Unlock() + if c.File != "" { + c.Read(c.File) + } +} + +// Dsn formats a connection string based on Config +func (c *Config) Dsn() string { + var parameters []string + if c.Host != "" { + parameters = append(parameters, fmt.Sprintf("host=%s", c.Host)) + } + if c.Port != 0 { + parameters = append(parameters, fmt.Sprintf("port=%d", c.Port)) + } + if c.User != "" { + parameters = append(parameters, fmt.Sprintf("user=%s", c.User)) + } + if c.Password != "" { + parameters = append(parameters, fmt.Sprintf("password=%s", c.Password)) + } + if c.Database != "" { + parameters = append(parameters, fmt.Sprintf("database=%s", c.Database)) + } + if c.ConnectTimeout != 0 { + parameters = append(parameters, fmt.Sprintf("connect_timeout=%d", c.ConnectTimeout)) + } + if AppName != "" { + parameters = append(parameters, fmt.Sprintf("application_name=%s", AppName)) + } + return strings.Join(parameters, " ") +} diff --git a/base/context.go b/base/context.go new file mode 100644 index 0000000..6ccf5ed --- /dev/null +++ b/base/context.go @@ -0,0 +1,17 @@ +package base + +// Context stores dynamic values like channels and exposes configuration +type Context struct { + Sessions chan Session + Done chan bool + Config *Config +} + +// NewContext instanciates a Context +func NewContext(config *Config, sessions chan Session, done chan bool) *Context { + return &Context{ + Config: config, + Sessions: sessions, + Done: done, + } +} diff --git a/base/db.go b/base/db.go new file mode 100644 index 0000000..fcd49d6 --- /dev/null +++ b/base/db.go @@ -0,0 +1,76 @@ +package base + +import ( + "database/sql" + "github.com/lib/pq" + "strconv" +) + +const ( + maxQueryLength = 1000 +) + +// Db centralizes connection to the database +type Db struct { + dsn string + conn *sql.DB +} + +// NewDb creates a Db object +func NewDb(dsn string) *Db { + return &Db{ + dsn: dsn, + } +} + +// Connect connects to the instance and ping it to ensure connection is working +func (db *Db) Connect() { + conn, err := sql.Open("postgres", db.dsn) + Panic(err) + + err = conn.Ping() + Panic(err) + + db.conn = conn +} + +// Disconnect ends connection cleanly +func (db *Db) Disconnect() { + err := db.conn.Close() + Panic(err) +} + +// Sessions connects to the database and returns current sessions +func (db *Db) Sessions() (sessions []Session) { + query := `select pid as pid, usename as user, datname as db, host(client_addr)::text || ':' || client_port::text as client, state as state, substring(query from 1 for ` + strconv.Itoa(maxQueryLength) + `) as query, coalesce(extract(epoch from now() - backend_start), 0) as "backendDuration", coalesce(extract(epoch from now() - xact_start), 0) as "xactDuration", coalesce(extract(epoch from now() - query_start), 0) as "queryDuration" from pg_catalog.pg_stat_activity where pid <> pg_backend_pid();` + rows, err := db.conn.Query(query) + Panic(err) + defer rows.Close() + + for rows.Next() { + var pid sql.NullInt64 + var user, db, client, state, query sql.NullString + var backendDuration, xactDuration, queryDuration float64 + err := rows.Scan(&pid, &user, &db, &client, &state, &query, &backendDuration, &xactDuration, &queryDuration) + Panic(err) + + if pid.Valid && user.Valid && db.Valid && client.Valid && state.Valid && query.Valid { + sessions = append(sessions, NewSession(pid.Int64, user.String, db.String, client.String, state.String, query.String, backendDuration, xactDuration, queryDuration)) + } + } + + return sessions +} + +// TerminateSessions terminates a list of sessions +func (db *Db) TerminateSessions(sessions []Session) { + var pids []int64 + for _, session := range sessions { + pids = append(pids, session.Pid) + } + if len(pids) > 0 { + query := `select pg_terminate_backend(pid) from pg_stat_activity where pid = any($1);` + _, err := db.conn.Exec(query, pq.Array(pids)) + Panic(err) + } +} diff --git a/base/session.go b/base/session.go new file mode 100644 index 0000000..0ced787 --- /dev/null +++ b/base/session.go @@ -0,0 +1,67 @@ +package base + +import ( + "fmt" + "strings" +) + +// Session represents a PostgreSQL backend +type Session struct { + Pid int64 + User string + Db string + Client string + State string + Query string + BackendDuration float64 + XactDuration float64 + QueryDuration float64 +} + +// NewSession instanciates a Session +func NewSession(pid int64, user string, db string, client string, state string, query string, backendDuration float64, xactDuration float64, queryDuration float64) Session { + return Session{ + Pid: pid, + User: user, + Db: db, + Client: client, + State: state, + Query: query, + BackendDuration: backendDuration, + XactDuration: xactDuration, + QueryDuration: queryDuration, + } +} + +// String represents a Session as a string +func (s Session) String() string { + var output []string + if s.Pid != 0 { + output = append(output, fmt.Sprintf("pid=%d", s.Pid)) + } + if s.User != "" { + output = append(output, fmt.Sprintf("user=%s", s.User)) + } + if s.Db != "" { + output = append(output, fmt.Sprintf("db=%s", s.Db)) + } + if s.Client != "" { + output = append(output, fmt.Sprintf("client=%s", s.Client)) + } + if s.State != "" { + output = append(output, fmt.Sprintf("state=%s", s.State)) + } + if s.BackendDuration != 0 { + output = append(output, fmt.Sprintf("backend_duration=%f", s.BackendDuration)) + } + if s.XactDuration != 0 { + output = append(output, fmt.Sprintf("xact_duration=%f", s.XactDuration)) + } + if s.QueryDuration != 0 { + output = append(output, fmt.Sprintf("query_duration=%f", s.QueryDuration)) + } + if s.Query != "" { + output = append(output, fmt.Sprintf("query=%s", s.Query)) + } + return strings.Join(output, " ") +} diff --git a/base/utils.go b/base/utils.go new file mode 100644 index 0000000..a8e326b --- /dev/null +++ b/base/utils.go @@ -0,0 +1,12 @@ +package base + +import ( + "log" +) + +// Panic prints a non-nil error and terminates the program +func Panic(err error) { + if err != nil { + log.Fatalln(err) + } +} diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..14e1dce --- /dev/null +++ b/build.sh @@ -0,0 +1,16 @@ +#!/bin/bash +BINARY=pgterminate +VERSION=$(cat VERSION) +BUILD_PATH=/tmp/${BINARY}-${VERSION} +ldflags="-X main.AppVersion=${VERSION}" +GOOS=linux +GOARCH=amd64 + +export GOOS +export GOARCH + +go build -ldflags "$ldflags" -o ${BUILD_PATH}/${BINARY} cmd/${BINARY}/main.go +(cd ${BUILD_PATH} && tar czf ${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz ${BINARY}) + +echo "Archive created:" +ls -l ${BUILD_PATH}/${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz \ No newline at end of file diff --git a/cmd/pgterminate/main.go b/cmd/pgterminate/main.go new file mode 100644 index 0000000..6ebeaef --- /dev/null +++ b/cmd/pgterminate/main.go @@ -0,0 +1,140 @@ +package main + +import ( + "flag" + "fmt" + "github.com/jouir/pgterminate/base" + "github.com/jouir/pgterminate/notifier" + "github.com/jouir/pgterminate/terminator" + "golang.org/x/crypto/ssh/terminal" + "log" + "os" + "os/signal" + "strconv" + "sync" + "syscall" +) + +// AppVersion stores application version at compilation time +var AppVersion string + +func main() { + var err error + config := base.NewConfig() + + version := flag.Bool("version", false, "Print version") + flag.StringVar(&config.File, "config", "", "Configuration file") + flag.StringVar(&config.Host, "host", "", "Instance host address") + flag.IntVar(&config.Port, "port", 0, "Instance port") + flag.StringVar(&config.User, "user", "", "Instance username") + flag.StringVar(&config.Password, "password", "", "Instance password") + flag.StringVar(&config.Database, "database", "", "Instance database") + prompt := flag.Bool("prompt-password", false, "Prompt for password") + flag.Float64Var(&config.Interval, "interval", 1, "Time to sleep between iterations in seconds") + flag.IntVar(&config.ConnectTimeout, "connect-timeout", 3, "Connection timeout in seconds") + flag.Float64Var(&config.IdleTimeout, "idle-timeout", 0, "Time for idle connections to be terminated in seconds") + flag.Float64Var(&config.ActiveTimeout, "active-timeout", 0, "Time for active connections to be terminated in seconds") + flag.StringVar(&config.LogFile, "log-file", "", "Write logs to a file") + flag.StringVar(&config.PidFile, "pid-file", "", "Write process id into a file") + flag.Parse() + + if *version { + if AppVersion == "" { + AppVersion = "unknown" + } + fmt.Println(AppVersion) + return + } + + if *prompt { + fmt.Print("Password:") + bytes, err := terminal.ReadPassword(syscall.Stdin) + base.Panic(err) + config.Password = string(bytes) + fmt.Print("\n") + } + + if config.File != "" { + err = config.Read(config.File) + base.Panic(err) + } + + if config.ActiveTimeout == 0 && config.IdleTimeout == 0 { + log.Fatalln("Parameter -active-timeout or -idle-timeout required") + } + + if config.PidFile != "" { + writePid(config.PidFile) + defer removePid(config.PidFile) + } + + done := make(chan bool) + sessions := make(chan base.Session) + + ctx := base.NewContext(config, sessions, done) + terminator := terminator.NewTerminator(ctx) + notifier := notifier.NewNotifier(ctx) + + handleSignals(ctx, notifier) + + // Run managers asynchronously and wait for all of them to end + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + terminator.Run() + }() + go func() { + defer wg.Done() + notifier.Run() + }() + wg.Wait() +} + +// handleSignals handles operating system signals +func handleSignals(ctx *base.Context, n notifier.Notifier) { + // When interrupt or terminated, terminate managers, close channel and terminate program + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT) + signal.Notify(c, syscall.SIGTERM) + go func() { + for sig := range c { + log.Printf("Received %v signal\n", sig) + close(ctx.Sessions) + ctx.Done <- true + } + }() + + // When hangup, reload notifier + h := make(chan os.Signal, 1) + signal.Notify(h, syscall.SIGHUP) + go func() { + for sig := range h { + log.Printf("Received %v signal\n", sig) + ctx.Config.Reload() + n.Reload() + } + }() +} + +// writePid writes current pid into a pid file +func writePid(file string) { + log.Println("Creating pid file", file) + pid := strconv.Itoa(os.Getpid()) + + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) + base.Panic(err) + defer f.Close() + + _, err = f.WriteString(pid) + base.Panic(err) +} + +// removePid removes pid file +func removePid(file string) { + if _, err := os.Stat(file); err == nil { + log.Println("Removing pid file", file) + err := os.Remove(file) + base.Panic(err) + } +} diff --git a/config.yaml.example b/config.yaml.example new file mode 100644 index 0000000..267354c --- /dev/null +++ b/config.yaml.example @@ -0,0 +1,11 @@ +--- +host: 127.0.0.1 +port: 5432 +user: test +password: **** +interval: 1000 +connect-timeout: 3 +idle-timeout: 300 +active-timeout: 10 +log-file: /var/log/pgterminate/pgterminate.log +pid-file: /var/run/pgterminate/pgterminate.pid \ No newline at end of file diff --git a/notifier/console.go b/notifier/console.go new file mode 100644 index 0000000..4bee564 --- /dev/null +++ b/notifier/console.go @@ -0,0 +1,29 @@ +package notifier + +import ( + "github.com/jouir/pgterminate/base" + "log" +) + +// Console notifier structure +type Console struct { + sessions chan base.Session +} + +// NewConsole creates a console notifier +func NewConsole(sessions chan base.Session) Notifier { + return &Console{ + sessions: sessions, + } +} + +// Run starts console notifier +func (c *Console) Run() { + for session := range c.sessions { + log.Printf("%s", session) + } +} + +// Reload for handling SIGHUP signals +func (c *Console) Reload() { +} diff --git a/notifier/file.go b/notifier/file.go new file mode 100644 index 0000000..e74d282 --- /dev/null +++ b/notifier/file.go @@ -0,0 +1,55 @@ +package notifier + +import ( + "github.com/jouir/pgterminate/base" + "log" + "os" + "time" +) + +// File structure for file notifier +type File struct { + handle *os.File + name string + sessions chan base.Session +} + +// NewFile creates a file notifier +func NewFile(name string, sessions chan base.Session) Notifier { + return &File{ + name: name, + sessions: sessions, + } +} + +// Run starts the file notifier +func (f *File) Run() { + f.open() + defer f.terminate() + + for session := range f.sessions { + timestamp := time.Now().Format(time.RFC3339) + _, err := f.handle.WriteString(timestamp + " " + session.String() + "\n") + base.Panic(err) + } +} + +// open opens a log file +func (f *File) open() { + var err error + f.handle, err = os.OpenFile(f.name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + base.Panic(err) +} + +// Reload closes and re-open the file to be compatible with logrotate +func (f *File) Reload() { + log.Println("Re-opening log file", f.name) + f.handle.Close() + f.open() +} + +// terminate closes the file +func (f *File) terminate() { + log.Println("Closing log file", f.name) + f.handle.Close() +} diff --git a/notifier/notifier.go b/notifier/notifier.go new file mode 100644 index 0000000..9a8ea21 --- /dev/null +++ b/notifier/notifier.go @@ -0,0 +1,20 @@ +package notifier + +import ( + "github.com/jouir/pgterminate/base" +) + +// Notifier generic interface for implementing a notifier +type Notifier interface { + Run() + Reload() +} + +// NewNotifier looks into Config to create a File or Console notifier and pass it +// the session channel for consuming sessions structs sent by terminator +func NewNotifier(ctx *base.Context) Notifier { + if ctx.Config.LogFile != "" { + return NewFile(ctx.Config.LogFile, ctx.Sessions) + } + return NewConsole(ctx.Sessions) +} diff --git a/terminator/terminator.go b/terminator/terminator.go new file mode 100644 index 0000000..fde6ea2 --- /dev/null +++ b/terminator/terminator.go @@ -0,0 +1,94 @@ +package terminator + +import ( + "github.com/jouir/pgterminate/base" + "log" + "time" +) + +// Terminator looks for sessions, filters actives and idles, terminate them and notify sessions channel +// It ends itself gracefully when done channel is triggered +type Terminator struct { + config *base.Config + db *base.Db + sessions chan base.Session + done chan bool +} + +// NewTerminator instanciates a Terminator +func NewTerminator(ctx *base.Context) *Terminator { + return &Terminator{ + config: ctx.Config, + sessions: ctx.Sessions, + done: ctx.Done, + } +} + +// Run starts the Terminator +func (t *Terminator) Run() { + t.db = base.NewDb(t.config.Dsn()) + log.Println("Connecting to instance") + t.db.Connect() + defer t.terminate() + + for { + select { + case <-t.done: + return + default: + sessions := t.db.Sessions() + if t.config.ActiveTimeout != 0 { + actives := activeSessions(sessions, t.config.ActiveTimeout) + go t.terminateAndNotify(actives) + } + + if t.config.IdleTimeout != 0 { + idles := idleSessions(sessions, t.config.IdleTimeout) + go t.terminateAndNotify(idles) + } + time.Sleep(time.Duration(t.config.Interval*1000) * time.Millisecond) + } + + } +} + +// terminateAndNotify terminates a list of sessions and notifies channel +func (t *Terminator) terminateAndNotify(sessions []base.Session) { + t.db.TerminateSessions(sessions) + for _, session := range sessions { + t.sessions <- session + } +} + +// terminate terminates gracefully +func (t *Terminator) terminate() { + log.Println("Disconnecting from instance") + t.db.Disconnect() +} + +// activeSessions returns a list of active sessions +// A session is active when state is "active" and backend has started before elapsed +// seconds +func activeSessions(sessions []base.Session, elapsed float64) (result []base.Session) { + for _, session := range sessions { + if session.State == "active" && session.QueryDuration > elapsed { + result = append(result, session) + } + } + return result +} + +// idleSessions returns a list of idle sessions +// A sessions is idle when state is "idle" and backend has started before elapsed seconds +// and when state is "idle in transaction" or "idle in transaction (aborted)" and +// transaction has started before elapsed seconds +func idleSessions(sessions []base.Session, elapsed float64) (result []base.Session) { + for _, session := range sessions { + if session.State == "idle" && session.BackendDuration > elapsed { + result = append(result, session) + } else if (session.State == "idle in transaction" || session.State == "idle in transaction (aborted)") && session.XactDuration > elapsed { + result = append(result, session) + } + } + return result +}