From c0382eaad9b21c21e354ab53294a633d58137017 Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Sun, 8 Jul 2018 23:48:48 +0200 Subject: [PATCH] Add exclude-listeners parameter --- README.md | 4 ++++ base/config.go | 1 + base/context.go | 4 ++-- base/db.go | 6 +++--- base/session.go | 8 ++++---- cmd/pgterminate/main.go | 3 ++- notifier/console.go | 4 ++-- notifier/file.go | 4 ++-- notifier/syslog.go | 4 ++-- terminator/terminator.go | 31 +++++++++++++++++++++++++------ 10 files changed, 47 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 8fdd9f1..6fc2a8e 100644 --- a/README.md +++ b/README.md @@ -90,5 +90,9 @@ When include users list or regex is set, `pgterminate` will focus on included us When exclude users list or regex is set and no include option is set, `pgterminate` will terminate all sessions except excluded users. +# Listeners + +LISTEN queries are asynchronous. Sessions are set to "idle" state even if they are waiting for messages to be sent to the queue. `pgterminate` can exclude sessions in that state by looking at the last known query starting with "LISTEN", with the `exclude-listeners` parameter. + # License `pgterminate` is released under [The Unlicense](LICENSE) license. Code is under public domain. diff --git a/base/config.go b/base/config.go index ee2eeb1..e005de1 100644 --- a/base/config.go +++ b/base/config.go @@ -38,6 +38,7 @@ type Config struct { ExcludeUsers StringFlags `yaml:"exclude-users"` ExcludeUsersRegex string `yaml:"exclude-users-regex"` ExcludeUsersRegexCompiled *regexp.Regexp + ExcludeListeners bool `yaml:"exclude-listeners"` Cancel bool `yaml:"cancel"` } diff --git a/base/context.go b/base/context.go index 6ccf5ed..50ecb20 100644 --- a/base/context.go +++ b/base/context.go @@ -2,13 +2,13 @@ package base // Context stores dynamic values like channels and exposes configuration type Context struct { - Sessions chan Session + Sessions chan *Session Done chan bool Config *Config } // NewContext instanciates a Context -func NewContext(config *Config, sessions chan Session, done chan bool) *Context { +func NewContext(config *Config, sessions chan *Session, done chan bool) *Context { return &Context{ Config: config, Sessions: sessions, diff --git a/base/db.go b/base/db.go index 67ad108..a628948 100644 --- a/base/db.go +++ b/base/db.go @@ -42,7 +42,7 @@ func (db *Db) Disconnect() { } // Sessions connects to the database and returns current sessions -func (db *Db) Sessions() (sessions []Session) { +func (db *Db) Sessions() (sessions []*Session) { query := fmt.Sprintf(`select pid as pid, usename as user, datname as db, @@ -72,7 +72,7 @@ func (db *Db) Sessions() (sessions []Session) { } // TerminateSessions terminates a list of sessions -func (db *Db) TerminateSessions(sessions []Session) { +func (db *Db) TerminateSessions(sessions []*Session) { var pids []int64 for _, session := range sessions { pids = append(pids, session.Pid) @@ -86,7 +86,7 @@ func (db *Db) TerminateSessions(sessions []Session) { } // CancelSessions terminates current query of a list of sessions -func (db *Db) CancelSessions(sessions []Session) { +func (db *Db) CancelSessions(sessions []*Session) { var pids []int64 for _, session := range sessions { pids = append(pids, session.Pid) diff --git a/base/session.go b/base/session.go index 8fdccc5..27f2135 100644 --- a/base/session.go +++ b/base/session.go @@ -17,8 +17,8 @@ type Session struct { } // NewSession instanciates a Session -func NewSession(pid int64, user string, db string, client string, state string, query string, stateDuration float64) Session { - return Session{ +func NewSession(pid int64, user string, db string, client string, state string, query string, stateDuration float64) *Session { + return &Session{ Pid: pid, User: user, Db: db, @@ -30,7 +30,7 @@ func NewSession(pid int64, user string, db string, client string, state string, } // String represents a Session as a string -func (s Session) String() string { +func (s *Session) String() string { var output []string if s.Pid != 0 { output = append(output, fmt.Sprintf("pid=%d", s.Pid)) @@ -57,7 +57,7 @@ func (s Session) String() string { } // IsIdle returns true when a session is doing nothing -func (s Session) IsIdle() bool { +func (s *Session) IsIdle() bool { if s.State == "idle" || s.State == "idle in transaction" || s.State == "idle in transaction (aborted)" { return true } diff --git a/cmd/pgterminate/main.go b/cmd/pgterminate/main.go index 25cbcbf..ac6e1d0 100644 --- a/cmd/pgterminate/main.go +++ b/cmd/pgterminate/main.go @@ -47,6 +47,7 @@ func main() { flag.StringVar(&config.IncludeUsersRegex, "include-users-regex", "", "Terminate users matching this regexp") flag.Var(&config.ExcludeUsers, "exclude-user", "Ignore this user (can be called multiple times)") flag.StringVar(&config.ExcludeUsersRegex, "exclude-users-regex", "", "Ignore users matching this regexp") + flag.BoolVar(&config.ExcludeListeners, "exclude-listeners", false, "Ignore sessions listening for events") flag.BoolVar(&config.Cancel, "cancel", false, "Cancel sessions instead of terminate") flag.Parse() @@ -107,7 +108,7 @@ func main() { } done := make(chan bool) - sessions := make(chan base.Session) + sessions := make(chan *base.Session) ctx := base.NewContext(config, sessions, done) terminator := terminator.NewTerminator(ctx) diff --git a/notifier/console.go b/notifier/console.go index 21a160d..480876d 100644 --- a/notifier/console.go +++ b/notifier/console.go @@ -7,11 +7,11 @@ import ( // Console notifier structure type Console struct { - sessions chan base.Session + sessions chan *base.Session } // NewConsole creates a console notifier -func NewConsole(sessions chan base.Session) Notifier { +func NewConsole(sessions chan *base.Session) Notifier { return &Console{ sessions: sessions, } diff --git a/notifier/file.go b/notifier/file.go index d4dc1a8..f57b328 100644 --- a/notifier/file.go +++ b/notifier/file.go @@ -12,12 +12,12 @@ import ( type File struct { handle *os.File name string - sessions chan base.Session + sessions chan *base.Session mutex sync.Mutex } // NewFile creates a file notifier -func NewFile(name string, sessions chan base.Session) Notifier { +func NewFile(name string, sessions chan *base.Session) Notifier { return &File{ name: name, sessions: sessions, diff --git a/notifier/syslog.go b/notifier/syslog.go index ab5d103..c599585 100644 --- a/notifier/syslog.go +++ b/notifier/syslog.go @@ -9,14 +9,14 @@ import ( // Syslog notifier type Syslog struct { - sessions chan base.Session + sessions chan *base.Session ident string priority syslog.Priority writer *syslog.Writer } // NewSyslog creates a syslog notifier -func NewSyslog(facility string, ident string, sessions chan base.Session) Notifier { +func NewSyslog(facility string, ident string, sessions chan *base.Session) Notifier { var priority syslog.Priority switch facility { case "LOCAL0": diff --git a/terminator/terminator.go b/terminator/terminator.go index 00ab241..707a567 100644 --- a/terminator/terminator.go +++ b/terminator/terminator.go @@ -3,6 +3,7 @@ package terminator import ( "github.com/jouir/pgterminate/base" "github.com/jouir/pgterminate/log" + "strings" "time" ) @@ -11,7 +12,7 @@ import ( type Terminator struct { config *base.Config db *base.Db - sessions chan base.Session + sessions chan *base.Session done chan bool } @@ -64,16 +65,16 @@ func (t *Terminator) Run() { } // notify sends sessions to channel -func (t *Terminator) notify(sessions []base.Session) { +func (t *Terminator) notify(sessions []*base.Session) { for _, session := range sessions { t.sessions <- session } } -// filter removes sessions according to include and exclude users settings +// filterUsers removes sessions according to include and exclude users settings // when include users slice and regex are not set, append all sessions except excluded users // otherwise, append included users -func (t *Terminator) filter(sessions []base.Session) (filtered []base.Session) { +func (t *Terminator) filterUsers(sessions []*base.Session) (filtered []*base.Session) { includeUsers, includeRegex := t.config.IncludeUsers, t.config.IncludeUsersRegexCompiled excludeUsers, excludeRegex := t.config.ExcludeUsers, t.config.ExcludeUsersRegexCompiled @@ -94,6 +95,24 @@ func (t *Terminator) filter(sessions []base.Session) (filtered []base.Session) { return filtered } +// filterListeners excludes sessions with last query starting with "LISTEN" +func (t *Terminator) filterListeners(sessions []*base.Session) (filtered []*base.Session) { + for _, session := range sessions { + if (session.Query == "") || (session.Query != "" && !strings.HasPrefix(strings.ToUpper(session.Query), "LISTEN")) { + filtered = append(filtered, session) + } + } + return filtered +} + +// filter executes all filter functions on a list of sessions +func (t *Terminator) filter(sessions []*base.Session) (filtered []*base.Session) { + filtered = sessions + filtered = t.filterUsers(filtered) + filtered = t.filterListeners(filtered) + return filtered +} + // terminate terminates gracefully func (t *Terminator) terminate() { log.Info("Disconnecting from instance") @@ -103,7 +122,7 @@ func (t *Terminator) terminate() { // activeSessions returns a list of active sessions // A session is active when state is "active" and state has changed before elapsed seconds // seconds -func activeSessions(sessions []base.Session, elapsed float64) (result []base.Session) { +func activeSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) { for _, session := range sessions { if session.State == "active" && session.StateDuration > elapsed { result = append(result, session) @@ -115,7 +134,7 @@ func activeSessions(sessions []base.Session, elapsed float64) (result []base.Ses // idleSessions returns a list of idle sessions // A sessions is idle when state is "idle", "idle in transaction" or "idle in transaction // (aborted)"and state has changed before elapsed seconds -func idleSessions(sessions []base.Session, elapsed float64) (result []base.Session) { +func idleSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) { for _, session := range sessions { if session.IsIdle() && session.StateDuration > elapsed { result = append(result, session)