Initial pgterminate code

This commit is contained in:
Julien Riou 2018-06-10 08:44:53 +02:00
parent 0487d635fc
commit 565c45a8fc
No known key found for this signature in database
GPG key ID: BA3E15176E45E85D
15 changed files with 697 additions and 0 deletions

11
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,11 @@
---
- repo: git://github.com/dnephin/pre-commit-golang
sha: HEAD
hooks:
- id: go-fmt
- id: go-lint
- repo: git://github.com/adrienverge/yamllint
sha: HEAD
hooks:
- id: yamllint

51
README.md Normal file
View file

@ -0,0 +1,51 @@
# pgterminate
> Terminates active and idle PostgreSQL backends
Did you encountered long-running queries locking down your entire company system because of a massive lock on the database? Or a scatterbrained developper connected to the production database with an open transaction leading to a production outage? Never? Really? Either you have very good policies and that's awesome, or you don't work in databases at all.
With `pgterminate`, you shouldn't be paged at night because some queries has locked down production for too long. It looks after "active" and "idle" connections and terminate them. As simple as that.
# Highlights
* `pgterminate` name is derived from `pg_terminate_backend` function, it terminates backends.
* backends are called sessions in `pgterminate`.
* `active` sessions are backends in `active` state for more than `active-timeout` seconds.
* `idle` sessions are backends in `idle`, `idle in transaction` or `idle in transaction (abort)` state for more than `idle-timeout` seconds.
* at least one of `active-timeout` and `idle-timeout` parameter is required, both can be used.
* `pgterminate` relies on `libpq` for PostgreSQL connection. When `-host` is ommited, connection via unix socket is used. When `-user` is ommited, the unix user is used. And so on.
* time parameters, like `connect-timeout`, `active-timeout`, `idle-timeout` and `interval`, are represented in seconds. They accept float value except for `connect-timeout` which is an integer.
* if you want `pgterminate` to terminate any session, ensure it has SUPERUSER privileges.
# Internals
## Signals
`pgterminate` handles the following OS signals:
* `SIGINT`, `SIGTERM` to gracefully terminates the infinite loop
* `SIGHUP` to reload configuration file and re-open log file if used (handy for logrotate)
## Configuration
There's two ways to configure `pgterminate`:
* command-line arguments
* configuration file with `-config` command-line argument
Configuration file options **override** command-line arguments
# Usage
Connect to a remote instance and prompt for password:
```
pgterminate -host 10.0.0.1 -port 5432 -user test -prompt-password -database test
```
Use a configuration file:
```
pgterminate -config config.yaml
```
Use both configuration file and command-line arguments:
```
pgterminate -config config.yaml -interval 0.25 -active-timeout 10 -idle-timeout 300
```
Print usage:
```
pgterminate -help
```
# License
`pgterminate` is released under [The Unlicense](https://github.com/jouir/pgterminate/blob/master/LICENSE) license. Code is under public domain.

1
VERSION Normal file
View file

@ -0,0 +1 @@
0.1.0

97
base/config.go Normal file
View file

@ -0,0 +1,97 @@
package base
import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"path/filepath"
"strings"
"sync"
)
// AppName exposes application name to config module
var AppName string
// Config receives configuration options
type Config struct {
mutex sync.Mutex
File string
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
Password string `yaml:"password"`
Database string `yaml:"database"`
Interval float64 `yaml:"interval"`
ConnectTimeout int `yaml:"connect-timeout"`
IdleTimeout float64 `yaml:"idle-timeout"`
ActiveTimeout float64 `yaml:"active-timeout"`
LogFile string `yaml:"log-file"`
PidFile string `yaml:"pid-file"`
}
func init() {
AppName = "pgterminate"
}
// NewConfig creates a Config object
func NewConfig() *Config {
return &Config{}
}
// Read loads options from a configuration file to Config
func (c *Config) Read(file string) error {
file, err := filepath.Abs(file)
if err != nil {
return err
}
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return err
}
err = yaml.Unmarshal(yamlFile, &c)
if err != nil {
return err
}
return nil
}
// Reload reads from file and update configuration
func (c *Config) Reload() {
log.Println("Reloading configuration")
c.mutex.Lock()
defer c.mutex.Unlock()
if c.File != "" {
c.Read(c.File)
}
}
// Dsn formats a connection string based on Config
func (c *Config) Dsn() string {
var parameters []string
if c.Host != "" {
parameters = append(parameters, fmt.Sprintf("host=%s", c.Host))
}
if c.Port != 0 {
parameters = append(parameters, fmt.Sprintf("port=%d", c.Port))
}
if c.User != "" {
parameters = append(parameters, fmt.Sprintf("user=%s", c.User))
}
if c.Password != "" {
parameters = append(parameters, fmt.Sprintf("password=%s", c.Password))
}
if c.Database != "" {
parameters = append(parameters, fmt.Sprintf("database=%s", c.Database))
}
if c.ConnectTimeout != 0 {
parameters = append(parameters, fmt.Sprintf("connect_timeout=%d", c.ConnectTimeout))
}
if AppName != "" {
parameters = append(parameters, fmt.Sprintf("application_name=%s", AppName))
}
return strings.Join(parameters, " ")
}

17
base/context.go Normal file
View file

@ -0,0 +1,17 @@
package base
// Context stores dynamic values like channels and exposes configuration
type Context struct {
Sessions chan Session
Done chan bool
Config *Config
}
// NewContext instanciates a Context
func NewContext(config *Config, sessions chan Session, done chan bool) *Context {
return &Context{
Config: config,
Sessions: sessions,
Done: done,
}
}

76
base/db.go Normal file
View file

@ -0,0 +1,76 @@
package base
import (
"database/sql"
"github.com/lib/pq"
"strconv"
)
const (
maxQueryLength = 1000
)
// Db centralizes connection to the database
type Db struct {
dsn string
conn *sql.DB
}
// NewDb creates a Db object
func NewDb(dsn string) *Db {
return &Db{
dsn: dsn,
}
}
// Connect connects to the instance and ping it to ensure connection is working
func (db *Db) Connect() {
conn, err := sql.Open("postgres", db.dsn)
Panic(err)
err = conn.Ping()
Panic(err)
db.conn = conn
}
// Disconnect ends connection cleanly
func (db *Db) Disconnect() {
err := db.conn.Close()
Panic(err)
}
// Sessions connects to the database and returns current sessions
func (db *Db) Sessions() (sessions []Session) {
query := `select pid as pid, usename as user, datname as db, host(client_addr)::text || ':' || client_port::text as client, state as state, substring(query from 1 for ` + strconv.Itoa(maxQueryLength) + `) as query, coalesce(extract(epoch from now() - backend_start), 0) as "backendDuration", coalesce(extract(epoch from now() - xact_start), 0) as "xactDuration", coalesce(extract(epoch from now() - query_start), 0) as "queryDuration" from pg_catalog.pg_stat_activity where pid <> pg_backend_pid();`
rows, err := db.conn.Query(query)
Panic(err)
defer rows.Close()
for rows.Next() {
var pid sql.NullInt64
var user, db, client, state, query sql.NullString
var backendDuration, xactDuration, queryDuration float64
err := rows.Scan(&pid, &user, &db, &client, &state, &query, &backendDuration, &xactDuration, &queryDuration)
Panic(err)
if pid.Valid && user.Valid && db.Valid && client.Valid && state.Valid && query.Valid {
sessions = append(sessions, NewSession(pid.Int64, user.String, db.String, client.String, state.String, query.String, backendDuration, xactDuration, queryDuration))
}
}
return sessions
}
// TerminateSessions terminates a list of sessions
func (db *Db) TerminateSessions(sessions []Session) {
var pids []int64
for _, session := range sessions {
pids = append(pids, session.Pid)
}
if len(pids) > 0 {
query := `select pg_terminate_backend(pid) from pg_stat_activity where pid = any($1);`
_, err := db.conn.Exec(query, pq.Array(pids))
Panic(err)
}
}

67
base/session.go Normal file
View file

@ -0,0 +1,67 @@
package base
import (
"fmt"
"strings"
)
// Session represents a PostgreSQL backend
type Session struct {
Pid int64
User string
Db string
Client string
State string
Query string
BackendDuration float64
XactDuration float64
QueryDuration float64
}
// NewSession instanciates a Session
func NewSession(pid int64, user string, db string, client string, state string, query string, backendDuration float64, xactDuration float64, queryDuration float64) Session {
return Session{
Pid: pid,
User: user,
Db: db,
Client: client,
State: state,
Query: query,
BackendDuration: backendDuration,
XactDuration: xactDuration,
QueryDuration: queryDuration,
}
}
// String represents a Session as a string
func (s Session) String() string {
var output []string
if s.Pid != 0 {
output = append(output, fmt.Sprintf("pid=%d", s.Pid))
}
if s.User != "" {
output = append(output, fmt.Sprintf("user=%s", s.User))
}
if s.Db != "" {
output = append(output, fmt.Sprintf("db=%s", s.Db))
}
if s.Client != "" {
output = append(output, fmt.Sprintf("client=%s", s.Client))
}
if s.State != "" {
output = append(output, fmt.Sprintf("state=%s", s.State))
}
if s.BackendDuration != 0 {
output = append(output, fmt.Sprintf("backend_duration=%f", s.BackendDuration))
}
if s.XactDuration != 0 {
output = append(output, fmt.Sprintf("xact_duration=%f", s.XactDuration))
}
if s.QueryDuration != 0 {
output = append(output, fmt.Sprintf("query_duration=%f", s.QueryDuration))
}
if s.Query != "" {
output = append(output, fmt.Sprintf("query=%s", s.Query))
}
return strings.Join(output, " ")
}

12
base/utils.go Normal file
View file

@ -0,0 +1,12 @@
package base
import (
"log"
)
// Panic prints a non-nil error and terminates the program
func Panic(err error) {
if err != nil {
log.Fatalln(err)
}
}

16
build.sh Executable file
View file

@ -0,0 +1,16 @@
#!/bin/bash
BINARY=pgterminate
VERSION=$(cat VERSION)
BUILD_PATH=/tmp/${BINARY}-${VERSION}
ldflags="-X main.AppVersion=${VERSION}"
GOOS=linux
GOARCH=amd64
export GOOS
export GOARCH
go build -ldflags "$ldflags" -o ${BUILD_PATH}/${BINARY} cmd/${BINARY}/main.go
(cd ${BUILD_PATH} && tar czf ${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz ${BINARY})
echo "Archive created:"
ls -l ${BUILD_PATH}/${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz

140
cmd/pgterminate/main.go Normal file
View file

@ -0,0 +1,140 @@
package main
import (
"flag"
"fmt"
"github.com/jouir/pgterminate/base"
"github.com/jouir/pgterminate/notifier"
"github.com/jouir/pgterminate/terminator"
"golang.org/x/crypto/ssh/terminal"
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
)
// AppVersion stores application version at compilation time
var AppVersion string
func main() {
var err error
config := base.NewConfig()
version := flag.Bool("version", false, "Print version")
flag.StringVar(&config.File, "config", "", "Configuration file")
flag.StringVar(&config.Host, "host", "", "Instance host address")
flag.IntVar(&config.Port, "port", 0, "Instance port")
flag.StringVar(&config.User, "user", "", "Instance username")
flag.StringVar(&config.Password, "password", "", "Instance password")
flag.StringVar(&config.Database, "database", "", "Instance database")
prompt := flag.Bool("prompt-password", false, "Prompt for password")
flag.Float64Var(&config.Interval, "interval", 1, "Time to sleep between iterations in seconds")
flag.IntVar(&config.ConnectTimeout, "connect-timeout", 3, "Connection timeout in seconds")
flag.Float64Var(&config.IdleTimeout, "idle-timeout", 0, "Time for idle connections to be terminated in seconds")
flag.Float64Var(&config.ActiveTimeout, "active-timeout", 0, "Time for active connections to be terminated in seconds")
flag.StringVar(&config.LogFile, "log-file", "", "Write logs to a file")
flag.StringVar(&config.PidFile, "pid-file", "", "Write process id into a file")
flag.Parse()
if *version {
if AppVersion == "" {
AppVersion = "unknown"
}
fmt.Println(AppVersion)
return
}
if *prompt {
fmt.Print("Password:")
bytes, err := terminal.ReadPassword(syscall.Stdin)
base.Panic(err)
config.Password = string(bytes)
fmt.Print("\n")
}
if config.File != "" {
err = config.Read(config.File)
base.Panic(err)
}
if config.ActiveTimeout == 0 && config.IdleTimeout == 0 {
log.Fatalln("Parameter -active-timeout or -idle-timeout required")
}
if config.PidFile != "" {
writePid(config.PidFile)
defer removePid(config.PidFile)
}
done := make(chan bool)
sessions := make(chan base.Session)
ctx := base.NewContext(config, sessions, done)
terminator := terminator.NewTerminator(ctx)
notifier := notifier.NewNotifier(ctx)
handleSignals(ctx, notifier)
// Run managers asynchronously and wait for all of them to end
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
terminator.Run()
}()
go func() {
defer wg.Done()
notifier.Run()
}()
wg.Wait()
}
// handleSignals handles operating system signals
func handleSignals(ctx *base.Context, n notifier.Notifier) {
// When interrupt or terminated, terminate managers, close channel and terminate program
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
signal.Notify(c, syscall.SIGTERM)
go func() {
for sig := range c {
log.Printf("Received %v signal\n", sig)
close(ctx.Sessions)
ctx.Done <- true
}
}()
// When hangup, reload notifier
h := make(chan os.Signal, 1)
signal.Notify(h, syscall.SIGHUP)
go func() {
for sig := range h {
log.Printf("Received %v signal\n", sig)
ctx.Config.Reload()
n.Reload()
}
}()
}
// writePid writes current pid into a pid file
func writePid(file string) {
log.Println("Creating pid file", file)
pid := strconv.Itoa(os.Getpid())
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644)
base.Panic(err)
defer f.Close()
_, err = f.WriteString(pid)
base.Panic(err)
}
// removePid removes pid file
func removePid(file string) {
if _, err := os.Stat(file); err == nil {
log.Println("Removing pid file", file)
err := os.Remove(file)
base.Panic(err)
}
}

11
config.yaml.example Normal file
View file

@ -0,0 +1,11 @@
---
host: 127.0.0.1
port: 5432
user: test
password: ****
interval: 1000
connect-timeout: 3
idle-timeout: 300
active-timeout: 10
log-file: /var/log/pgterminate/pgterminate.log
pid-file: /var/run/pgterminate/pgterminate.pid

29
notifier/console.go Normal file
View file

@ -0,0 +1,29 @@
package notifier
import (
"github.com/jouir/pgterminate/base"
"log"
)
// Console notifier structure
type Console struct {
sessions chan base.Session
}
// NewConsole creates a console notifier
func NewConsole(sessions chan base.Session) Notifier {
return &Console{
sessions: sessions,
}
}
// Run starts console notifier
func (c *Console) Run() {
for session := range c.sessions {
log.Printf("%s", session)
}
}
// Reload for handling SIGHUP signals
func (c *Console) Reload() {
}

55
notifier/file.go Normal file
View file

@ -0,0 +1,55 @@
package notifier
import (
"github.com/jouir/pgterminate/base"
"log"
"os"
"time"
)
// File structure for file notifier
type File struct {
handle *os.File
name string
sessions chan base.Session
}
// NewFile creates a file notifier
func NewFile(name string, sessions chan base.Session) Notifier {
return &File{
name: name,
sessions: sessions,
}
}
// Run starts the file notifier
func (f *File) Run() {
f.open()
defer f.terminate()
for session := range f.sessions {
timestamp := time.Now().Format(time.RFC3339)
_, err := f.handle.WriteString(timestamp + " " + session.String() + "\n")
base.Panic(err)
}
}
// open opens a log file
func (f *File) open() {
var err error
f.handle, err = os.OpenFile(f.name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
base.Panic(err)
}
// Reload closes and re-open the file to be compatible with logrotate
func (f *File) Reload() {
log.Println("Re-opening log file", f.name)
f.handle.Close()
f.open()
}
// terminate closes the file
func (f *File) terminate() {
log.Println("Closing log file", f.name)
f.handle.Close()
}

20
notifier/notifier.go Normal file
View file

@ -0,0 +1,20 @@
package notifier
import (
"github.com/jouir/pgterminate/base"
)
// Notifier generic interface for implementing a notifier
type Notifier interface {
Run()
Reload()
}
// NewNotifier looks into Config to create a File or Console notifier and pass it
// the session channel for consuming sessions structs sent by terminator
func NewNotifier(ctx *base.Context) Notifier {
if ctx.Config.LogFile != "" {
return NewFile(ctx.Config.LogFile, ctx.Sessions)
}
return NewConsole(ctx.Sessions)
}

94
terminator/terminator.go Normal file
View file

@ -0,0 +1,94 @@
package terminator
import (
"github.com/jouir/pgterminate/base"
"log"
"time"
)
// Terminator looks for sessions, filters actives and idles, terminate them and notify sessions channel
// It ends itself gracefully when done channel is triggered
type Terminator struct {
config *base.Config
db *base.Db
sessions chan base.Session
done chan bool
}
// NewTerminator instanciates a Terminator
func NewTerminator(ctx *base.Context) *Terminator {
return &Terminator{
config: ctx.Config,
sessions: ctx.Sessions,
done: ctx.Done,
}
}
// Run starts the Terminator
func (t *Terminator) Run() {
t.db = base.NewDb(t.config.Dsn())
log.Println("Connecting to instance")
t.db.Connect()
defer t.terminate()
for {
select {
case <-t.done:
return
default:
sessions := t.db.Sessions()
if t.config.ActiveTimeout != 0 {
actives := activeSessions(sessions, t.config.ActiveTimeout)
go t.terminateAndNotify(actives)
}
if t.config.IdleTimeout != 0 {
idles := idleSessions(sessions, t.config.IdleTimeout)
go t.terminateAndNotify(idles)
}
time.Sleep(time.Duration(t.config.Interval*1000) * time.Millisecond)
}
}
}
// terminateAndNotify terminates a list of sessions and notifies channel
func (t *Terminator) terminateAndNotify(sessions []base.Session) {
t.db.TerminateSessions(sessions)
for _, session := range sessions {
t.sessions <- session
}
}
// terminate terminates gracefully
func (t *Terminator) terminate() {
log.Println("Disconnecting from instance")
t.db.Disconnect()
}
// activeSessions returns a list of active sessions
// A session is active when state is "active" and backend has started before elapsed
// seconds
func activeSessions(sessions []base.Session, elapsed float64) (result []base.Session) {
for _, session := range sessions {
if session.State == "active" && session.QueryDuration > elapsed {
result = append(result, session)
}
}
return result
}
// idleSessions returns a list of idle sessions
// A sessions is idle when state is "idle" and backend has started before elapsed seconds
// and when state is "idle in transaction" or "idle in transaction (aborted)" and
// transaction has started before elapsed seconds
func idleSessions(sessions []base.Session, elapsed float64) (result []base.Session) {
for _, session := range sessions {
if session.State == "idle" && session.BackendDuration > elapsed {
result = append(result, session)
} else if (session.State == "idle in transaction" || session.State == "idle in transaction (aborted)") && session.XactDuration > elapsed {
result = append(result, session)
}
}
return result
}