pgterminate/terminator/terminator.go

186 lines
4.7 KiB
Go
Raw Normal View History

2018-06-10 08:44:53 +02:00
package terminator
import (
2018-07-08 23:48:48 +02:00
"strings"
2018-06-10 08:44:53 +02:00
"time"
"github.com/jouir/pgterminate/base"
"github.com/jouir/pgterminate/log"
2018-06-10 08:44:53 +02:00
)
// Terminator looks for sessions, filters actives and idles, terminate them and notify sessions channel
// It ends itself gracefully when done channel is triggered
type Terminator struct {
config *base.Config
db *base.Db
2018-07-08 23:48:48 +02:00
sessions chan *base.Session
2018-06-10 08:44:53 +02:00
done chan bool
}
// NewTerminator instanciates a Terminator
func NewTerminator(ctx *base.Context) *Terminator {
return &Terminator{
config: ctx.Config,
sessions: ctx.Sessions,
done: ctx.Done,
}
}
// Run starts the Terminator
func (t *Terminator) Run() {
2018-06-24 17:49:48 +02:00
log.Info("Starting terminator")
2018-06-10 08:44:53 +02:00
t.db = base.NewDb(t.config.Dsn())
2018-06-24 17:49:48 +02:00
log.Info("Connecting to instance")
2018-06-10 08:44:53 +02:00
t.db.Connect()
defer t.terminate()
for {
select {
case <-t.done:
return
default:
sessions := t.db.Sessions()
// Cancel or terminate active sessions
2018-06-10 08:44:53 +02:00
if t.config.ActiveTimeout != 0 {
actives := t.filter(activeSessions(sessions, t.config.ActiveTimeout))
if t.config.Cancel {
t.db.CancelSessions(actives)
} else {
t.db.TerminateSessions(actives)
}
t.notify(actives)
2018-06-10 08:44:53 +02:00
}
// Terminate idle sessions
2018-06-10 08:44:53 +02:00
if t.config.IdleTimeout != 0 {
idles := t.filter(idleSessions(sessions, t.config.IdleTimeout))
t.db.TerminateSessions(idles)
t.notify(idles)
2018-06-10 08:44:53 +02:00
}
2018-06-10 08:44:53 +02:00
time.Sleep(time.Duration(t.config.Interval*1000) * time.Millisecond)
}
}
}
// notify sends sessions to channel
2018-07-08 23:48:48 +02:00
func (t *Terminator) notify(sessions []*base.Session) {
2018-06-10 08:44:53 +02:00
for _, session := range sessions {
t.sessions <- session
}
}
// filterListeners excludes sessions with last query starting with "LISTEN"
func (t *Terminator) filterListeners(sessions []*base.Session) (filtered []*base.Session) {
2018-06-30 10:44:58 +02:00
for _, session := range sessions {
if (session.Query == "") || (session.Query != "" && !strings.HasPrefix(strings.ToUpper(session.Query), "LISTEN")) {
filtered = append(filtered, session)
}
}
return filtered
}
// filterUsers include and exclude users based on filters
func (t *Terminator) filterUsers(sessions []*base.Session) []*base.Session {
var included []*base.Session
if t.config.IncludeUsersFilters == nil {
included = sessions
} else {
for _, filter := range t.config.IncludeUsersFilters {
for _, session := range sessions {
if filter.Include(session.User) {
included = append(included, session)
}
2018-06-30 10:44:58 +02:00
}
}
}
var filtered []*base.Session
if t.config.ExcludeUsersFilters == nil {
filtered = included
} else {
for _, filter := range t.config.ExcludeUsersFilters {
for _, session := range included {
if filter.Include(session.User) {
filtered = append(filtered, session)
}
2018-06-30 10:44:58 +02:00
}
}
}
return filtered
}
// filterUsers include and exclude databases based on filters
func (t *Terminator) filterDatabases(sessions []*base.Session) []*base.Session {
var included []*base.Session
if t.config.IncludeDatabasesFilters == nil {
included = sessions
} else {
for _, filter := range t.config.IncludeDatabasesFilters {
for _, session := range sessions {
if filter.Include(session.Db) {
included = append(included, session)
}
}
}
}
var filtered []*base.Session
if t.config.ExcludeDatabasesFilters == nil {
filtered = included
} else {
for _, filter := range t.config.ExcludeDatabasesFilters {
for _, session := range included {
if filter.Include(session.Db) {
filtered = append(filtered, session)
}
}
2018-07-08 23:48:48 +02:00
}
}
2018-07-08 23:48:48 +02:00
return filtered
}
// filter executes all filter functions on a list of sessions
func (t *Terminator) filter(sessions []*base.Session) (filtered []*base.Session) {
filtered = t.filterListeners(sessions)
2018-07-08 23:48:48 +02:00
filtered = t.filterUsers(filtered)
filtered = t.filterDatabases(filtered)
2018-07-08 23:48:48 +02:00
return filtered
}
2018-06-10 08:44:53 +02:00
// terminate terminates gracefully
func (t *Terminator) terminate() {
2018-06-24 17:49:48 +02:00
log.Info("Disconnecting from instance")
2018-06-10 08:44:53 +02:00
t.db.Disconnect()
}
// activeSessions returns a list of active sessions
// A session is active when state is "active" and state has changed before elapsed seconds
2018-06-10 08:44:53 +02:00
// seconds
2018-07-08 23:48:48 +02:00
func activeSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
2018-06-10 08:44:53 +02:00
for _, session := range sessions {
if session.State == "active" && session.StateDuration > elapsed {
2018-06-10 08:44:53 +02:00
result = append(result, session)
}
}
return result
}
// idleSessions returns a list of idle sessions
// A sessions is idle when state is "idle", "idle in transaction" or "idle in transaction
// (aborted)"and state has changed before elapsed seconds
2018-07-08 23:48:48 +02:00
func idleSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
2018-06-10 08:44:53 +02:00
for _, session := range sessions {
if session.IsIdle() && session.StateDuration > elapsed {
2018-06-10 08:44:53 +02:00
result = append(result, session)
}
}
return result
}