Julien Riou
8709ee542b
- Add filters to include and exclude strings - Use filters to include and exclude sessions (user and databases supported) - Add tests to filters and terminator Signed-off-by: Julien Riou <julien@riou.xyz>
185 lines
4.7 KiB
Go
185 lines
4.7 KiB
Go
package terminator
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jouir/pgterminate/base"
|
|
"github.com/jouir/pgterminate/log"
|
|
)
|
|
|
|
// Terminator looks for sessions, filters actives and idles, terminate them and notify sessions channel
|
|
// It ends itself gracefully when done channel is triggered
|
|
type Terminator struct {
|
|
config *base.Config
|
|
db *base.Db
|
|
sessions chan *base.Session
|
|
done chan bool
|
|
}
|
|
|
|
// NewTerminator instanciates a Terminator
|
|
func NewTerminator(ctx *base.Context) *Terminator {
|
|
return &Terminator{
|
|
config: ctx.Config,
|
|
sessions: ctx.Sessions,
|
|
done: ctx.Done,
|
|
}
|
|
}
|
|
|
|
// Run starts the Terminator
|
|
func (t *Terminator) Run() {
|
|
log.Info("Starting terminator")
|
|
t.db = base.NewDb(t.config.Dsn())
|
|
log.Info("Connecting to instance")
|
|
t.db.Connect()
|
|
defer t.terminate()
|
|
|
|
for {
|
|
select {
|
|
case <-t.done:
|
|
return
|
|
default:
|
|
sessions := t.db.Sessions()
|
|
|
|
// Cancel or terminate active sessions
|
|
if t.config.ActiveTimeout != 0 {
|
|
actives := t.filter(activeSessions(sessions, t.config.ActiveTimeout))
|
|
if t.config.Cancel {
|
|
t.db.CancelSessions(actives)
|
|
} else {
|
|
t.db.TerminateSessions(actives)
|
|
}
|
|
t.notify(actives)
|
|
}
|
|
|
|
// Terminate idle sessions
|
|
if t.config.IdleTimeout != 0 {
|
|
idles := t.filter(idleSessions(sessions, t.config.IdleTimeout))
|
|
t.db.TerminateSessions(idles)
|
|
t.notify(idles)
|
|
}
|
|
|
|
time.Sleep(time.Duration(t.config.Interval*1000) * time.Millisecond)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// notify sends sessions to channel
|
|
func (t *Terminator) notify(sessions []*base.Session) {
|
|
for _, session := range sessions {
|
|
t.sessions <- session
|
|
}
|
|
}
|
|
|
|
// filterListeners excludes sessions with last query starting with "LISTEN"
|
|
func (t *Terminator) filterListeners(sessions []*base.Session) (filtered []*base.Session) {
|
|
for _, session := range sessions {
|
|
if (session.Query == "") || (session.Query != "" && !strings.HasPrefix(strings.ToUpper(session.Query), "LISTEN")) {
|
|
filtered = append(filtered, session)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
// filterUsers include and exclude users based on filters
|
|
func (t *Terminator) filterUsers(sessions []*base.Session) []*base.Session {
|
|
|
|
var included []*base.Session
|
|
if t.config.IncludeUsersFilters == nil {
|
|
included = sessions
|
|
} else {
|
|
for _, filter := range t.config.IncludeUsersFilters {
|
|
for _, session := range sessions {
|
|
if filter.Include(session.User) {
|
|
included = append(included, session)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var filtered []*base.Session
|
|
if t.config.ExcludeUsersFilters == nil {
|
|
filtered = included
|
|
} else {
|
|
for _, filter := range t.config.ExcludeUsersFilters {
|
|
for _, session := range included {
|
|
if filter.Include(session.User) {
|
|
filtered = append(filtered, session)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
// filterUsers include and exclude databases based on filters
|
|
func (t *Terminator) filterDatabases(sessions []*base.Session) []*base.Session {
|
|
|
|
var included []*base.Session
|
|
if t.config.IncludeDatabasesFilters == nil {
|
|
included = sessions
|
|
} else {
|
|
for _, filter := range t.config.IncludeDatabasesFilters {
|
|
for _, session := range sessions {
|
|
if filter.Include(session.Db) {
|
|
included = append(included, session)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var filtered []*base.Session
|
|
if t.config.ExcludeDatabasesFilters == nil {
|
|
filtered = included
|
|
} else {
|
|
for _, filter := range t.config.ExcludeDatabasesFilters {
|
|
for _, session := range included {
|
|
if filter.Include(session.Db) {
|
|
filtered = append(filtered, session)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
// filter executes all filter functions on a list of sessions
|
|
func (t *Terminator) filter(sessions []*base.Session) (filtered []*base.Session) {
|
|
filtered = t.filterListeners(sessions)
|
|
filtered = t.filterUsers(filtered)
|
|
filtered = t.filterDatabases(filtered)
|
|
return filtered
|
|
}
|
|
|
|
// terminate terminates gracefully
|
|
func (t *Terminator) terminate() {
|
|
log.Info("Disconnecting from instance")
|
|
t.db.Disconnect()
|
|
}
|
|
|
|
// activeSessions returns a list of active sessions
|
|
// A session is active when state is "active" and state has changed before elapsed seconds
|
|
// seconds
|
|
func activeSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
|
|
for _, session := range sessions {
|
|
if session.State == "active" && session.StateDuration > elapsed {
|
|
result = append(result, session)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// idleSessions returns a list of idle sessions
|
|
// A sessions is idle when state is "idle", "idle in transaction" or "idle in transaction
|
|
// (aborted)"and state has changed before elapsed seconds
|
|
func idleSessions(sessions []*base.Session, elapsed float64) (result []*base.Session) {
|
|
for _, session := range sessions {
|
|
if session.IsIdle() && session.StateDuration > elapsed {
|
|
result = append(result, session)
|
|
}
|
|
}
|
|
return result
|
|
}
|