Add exclude-listeners parameter

This commit is contained in:
Julien Riou 2018-07-08 23:48:48 +02:00
parent ebc48e3615
commit c0382eaad9
No known key found for this signature in database
GPG key ID: BA3E15176E45E85D
10 changed files with 47 additions and 22 deletions

View file

@ -90,5 +90,9 @@ When include users list or regex is set, `pgterminate` will focus on included us
When exclude users list or regex is set and no include option is set, `pgterminate` will terminate all sessions except excluded users. When exclude users list or regex is set and no include option is set, `pgterminate` will terminate all sessions except excluded users.
# Listeners
LISTEN queries are asynchronous. Sessions are set to "idle" state even if they are waiting for messages to be sent to the queue. `pgterminate` can exclude sessions in that state by looking at the last known query starting with "LISTEN", with the `exclude-listeners` parameter.
# License # License
`pgterminate` is released under [The Unlicense](LICENSE) license. Code is under public domain. `pgterminate` is released under [The Unlicense](LICENSE) license. Code is under public domain.

View file

@ -38,6 +38,7 @@ type Config struct {
ExcludeUsers StringFlags `yaml:"exclude-users"` ExcludeUsers StringFlags `yaml:"exclude-users"`
ExcludeUsersRegex string `yaml:"exclude-users-regex"` ExcludeUsersRegex string `yaml:"exclude-users-regex"`
ExcludeUsersRegexCompiled *regexp.Regexp ExcludeUsersRegexCompiled *regexp.Regexp
ExcludeListeners bool `yaml:"exclude-listeners"`
Cancel bool `yaml:"cancel"` Cancel bool `yaml:"cancel"`
} }

View file

@ -2,13 +2,13 @@ package base
// Context stores dynamic values like channels and exposes configuration // Context stores dynamic values like channels and exposes configuration
type Context struct { type Context struct {
Sessions chan Session Sessions chan *Session
Done chan bool Done chan bool
Config *Config Config *Config
} }
// NewContext instanciates a Context // NewContext instanciates a Context
func NewContext(config *Config, sessions chan Session, done chan bool) *Context { func NewContext(config *Config, sessions chan *Session, done chan bool) *Context {
return &Context{ return &Context{
Config: config, Config: config,
Sessions: sessions, Sessions: sessions,

View file

@ -42,7 +42,7 @@ func (db *Db) Disconnect() {
} }
// Sessions connects to the database and returns current sessions // Sessions connects to the database and returns current sessions
func (db *Db) Sessions() (sessions []Session) { func (db *Db) Sessions() (sessions []*Session) {
query := fmt.Sprintf(`select pid as pid, query := fmt.Sprintf(`select pid as pid,
usename as user, usename as user,
datname as db, datname as db,
@ -72,7 +72,7 @@ func (db *Db) Sessions() (sessions []Session) {
} }
// TerminateSessions terminates a list of sessions // TerminateSessions terminates a list of sessions
func (db *Db) TerminateSessions(sessions []Session) { func (db *Db) TerminateSessions(sessions []*Session) {
var pids []int64 var pids []int64
for _, session := range sessions { for _, session := range sessions {
pids = append(pids, session.Pid) pids = append(pids, session.Pid)
@ -86,7 +86,7 @@ func (db *Db) TerminateSessions(sessions []Session) {
} }
// CancelSessions terminates current query of a list of sessions // CancelSessions terminates current query of a list of sessions
func (db *Db) CancelSessions(sessions []Session) { func (db *Db) CancelSessions(sessions []*Session) {
var pids []int64 var pids []int64
for _, session := range sessions { for _, session := range sessions {
pids = append(pids, session.Pid) pids = append(pids, session.Pid)

View file

@ -17,8 +17,8 @@ type Session struct {
} }
// NewSession instanciates a Session // NewSession instanciates a Session
func NewSession(pid int64, user string, db string, client string, state string, query string, stateDuration float64) Session { func NewSession(pid int64, user string, db string, client string, state string, query string, stateDuration float64) *Session {
return Session{ return &Session{
Pid: pid, Pid: pid,
User: user, User: user,
Db: db, Db: db,
@ -30,7 +30,7 @@ func NewSession(pid int64, user string, db string, client string, state string,
} }
// String represents a Session as a string // String represents a Session as a string
func (s Session) String() string { func (s *Session) String() string {
var output []string var output []string
if s.Pid != 0 { if s.Pid != 0 {
output = append(output, fmt.Sprintf("pid=%d", s.Pid)) output = append(output, fmt.Sprintf("pid=%d", s.Pid))
@ -57,7 +57,7 @@ func (s Session) String() string {
} }
// IsIdle returns true when a session is doing nothing // IsIdle returns true when a session is doing nothing
func (s Session) IsIdle() bool { func (s *Session) IsIdle() bool {
if s.State == "idle" || s.State == "idle in transaction" || s.State == "idle in transaction (aborted)" { if s.State == "idle" || s.State == "idle in transaction" || s.State == "idle in transaction (aborted)" {
return true return true
} }

View file

@ -47,6 +47,7 @@ func main() {
flag.StringVar(&config.IncludeUsersRegex, "include-users-regex", "", "Terminate users matching this regexp") flag.StringVar(&config.IncludeUsersRegex, "include-users-regex", "", "Terminate users matching this regexp")
flag.Var(&config.ExcludeUsers, "exclude-user", "Ignore this user (can be called multiple times)") flag.Var(&config.ExcludeUsers, "exclude-user", "Ignore this user (can be called multiple times)")
flag.StringVar(&config.ExcludeUsersRegex, "exclude-users-regex", "", "Ignore users matching this regexp") flag.StringVar(&config.ExcludeUsersRegex, "exclude-users-regex", "", "Ignore users matching this regexp")
flag.BoolVar(&config.ExcludeListeners, "exclude-listeners", false, "Ignore sessions listening for events")
flag.BoolVar(&config.Cancel, "cancel", false, "Cancel sessions instead of terminate") flag.BoolVar(&config.Cancel, "cancel", false, "Cancel sessions instead of terminate")
flag.Parse() flag.Parse()
@ -107,7 +108,7 @@ func main() {
} }
done := make(chan bool) done := make(chan bool)
sessions := make(chan base.Session) sessions := make(chan *base.Session)
ctx := base.NewContext(config, sessions, done) ctx := base.NewContext(config, sessions, done)
terminator := terminator.NewTerminator(ctx) terminator := terminator.NewTerminator(ctx)

View file

@ -7,11 +7,11 @@ import (
// Console notifier structure // Console notifier structure
type Console struct { type Console struct {
sessions chan base.Session sessions chan *base.Session
} }
// NewConsole creates a console notifier // NewConsole creates a console notifier
func NewConsole(sessions chan base.Session) Notifier { func NewConsole(sessions chan *base.Session) Notifier {
return &Console{ return &Console{
sessions: sessions, sessions: sessions,
} }

View file

@ -12,12 +12,12 @@ import (
type File struct { type File struct {
handle *os.File handle *os.File
name string name string
sessions chan base.Session sessions chan *base.Session
mutex sync.Mutex mutex sync.Mutex
} }
// NewFile creates a file notifier // NewFile creates a file notifier
func NewFile(name string, sessions chan base.Session) Notifier { func NewFile(name string, sessions chan *base.Session) Notifier {
return &File{ return &File{
name: name, name: name,
sessions: sessions, sessions: sessions,

View file

@ -9,14 +9,14 @@ import (
// Syslog notifier // Syslog notifier
type Syslog struct { type Syslog struct {
sessions chan base.Session sessions chan *base.Session
ident string ident string
priority syslog.Priority priority syslog.Priority
writer *syslog.Writer writer *syslog.Writer
} }
// NewSyslog creates a syslog notifier // NewSyslog creates a syslog notifier
func NewSyslog(facility string, ident string, sessions chan base.Session) Notifier { func NewSyslog(facility string, ident string, sessions chan *base.Session) Notifier {
var priority syslog.Priority var priority syslog.Priority
switch facility { switch facility {
case "LOCAL0": case "LOCAL0":

View file

@ -3,6 +3,7 @@ package terminator
import ( import (
"github.com/jouir/pgterminate/base" "github.com/jouir/pgterminate/base"
"github.com/jouir/pgterminate/log" "github.com/jouir/pgterminate/log"
"strings"
"time" "time"
) )
@ -11,7 +12,7 @@ import (
type Terminator struct { type Terminator struct {
config *base.Config config *base.Config
db *base.Db db *base.Db
sessions chan base.Session sessions chan *base.Session
done chan bool done chan bool
} }
@ -64,16 +65,16 @@ func (t *Terminator) Run() {
} }
// notify sends sessions to channel // notify sends sessions to channel
func (t *Terminator) notify(sessions []base.Session) { func (t *Terminator) notify(sessions []*base.Session) {
for _, session := range sessions { for _, session := range sessions {
t.sessions <- session t.sessions <- session
} }
} }
// filter removes sessions according to include and exclude users settings // filterUsers removes sessions according to include and exclude users settings
// when include users slice and regex are not set, append all sessions except excluded users // when include users slice and regex are not set, append all sessions except excluded users
// otherwise, append included users // otherwise, append included users
func (t *Terminator) filter(sessions []base.Session) (filtered []base.Session) { func (t *Terminator) filterUsers(sessions []*base.Session) (filtered []*base.Session) {
includeUsers, includeRegex := t.config.IncludeUsers, t.config.IncludeUsersRegexCompiled includeUsers, includeRegex := t.config.IncludeUsers, t.config.IncludeUsersRegexCompiled
excludeUsers, excludeRegex := t.config.ExcludeUsers, t.config.ExcludeUsersRegexCompiled excludeUsers, excludeRegex := t.config.ExcludeUsers, t.config.ExcludeUsersRegexCompiled
@ -94,6 +95,24 @@ func (t *Terminator) filter(sessions []base.Session) (filtered []base.Session) {
return filtered return filtered
} }
// filterListeners excludes sessions with last query starting with "LISTEN"
func (t *Terminator) filterListeners(sessions []*base.Session) (filtered []*base.Session) {
for _, session := range sessions {
if (session.Query == "") || (session.Query != "" && !strings.HasPrefix(strings.ToUpper(session.Query), "LISTEN")) {
filtered = append(filtered, session)
}
}
return filtered
}
// filter executes all filter functions on a list of sessions
func (t *Terminator) filter(sessions []*base.Session) (filtered []*base.Session) {
filtered = sessions
filtered = t.filterUsers(filtered)
filtered = t.filterListeners(filtered)
return filtered
}
// terminate terminates gracefully // terminate terminates gracefully
func (t *Terminator) terminate() { func (t *Terminator) terminate() {
log.Info("Disconnecting from instance") log.Info("Disconnecting from instance")
@ -103,7 +122,7 @@ func (t *Terminator) terminate() {
// activeSessions returns a list of active sessions // activeSessions returns a list of active sessions
// A session is active when state is "active" and state has changed before elapsed seconds // A session is active when state is "active" and state has changed before elapsed seconds
// seconds // seconds
func activeSessions(sessions []base.Session, elapsed float64) (result []base.Session) { func activeSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
for _, session := range sessions { for _, session := range sessions {
if session.State == "active" && session.StateDuration > elapsed { if session.State == "active" && session.StateDuration > elapsed {
result = append(result, session) result = append(result, session)
@ -115,7 +134,7 @@ func activeSessions(sessions []base.Session, elapsed float64) (result []base.Ses
// idleSessions returns a list of idle sessions // idleSessions returns a list of idle sessions
// A sessions is idle when state is "idle", "idle in transaction" or "idle in transaction // A sessions is idle when state is "idle", "idle in transaction" or "idle in transaction
// (aborted)"and state has changed before elapsed seconds // (aborted)"and state has changed before elapsed seconds
func idleSessions(sessions []base.Session, elapsed float64) (result []base.Session) { func idleSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
for _, session := range sessions { for _, session := range sessions {
if session.IsIdle() && session.StateDuration > elapsed { if session.IsIdle() && session.StateDuration > elapsed {
result = append(result, session) result = append(result, session)