Initial release
This commit is contained in:
parent
be71075e18
commit
9818566369
10 changed files with 634 additions and 0 deletions
48
README.md
Normal file
48
README.md
Normal file
|
@ -0,0 +1,48 @@
|
|||
# patroniglue
|
||||
> Handle and cache basic Patroni API checks
|
||||
|
||||
[Patroni](https://github.com/zalando/patroni) uses the built-in Python HTTP server to expose database states. It's perfect to be used by a load balancer like HAProxy to achieve high-availability. But, sometimes, this interface freezes. There's an [open issue](https://github.com/zalando/patroni/issues/857) we are trying to close actively. As production doesn't wait, `patroniglue` was created to offload those checks and release pressure by adding a little response cache.
|
||||
|
||||
## Usage
|
||||
Start process using a configuration file:
|
||||
```
|
||||
patroniglue -config config.yml
|
||||
```
|
||||
Add more logging output:
|
||||
```
|
||||
patroniglue -config config.yml -verbose
|
||||
```
|
||||
Print usage:
|
||||
```
|
||||
patroniglue -help
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Configuration file format is YAML.
|
||||
|
||||
* `frontend`: settings to handle incoming requests
|
||||
* `host`: address to handle requests (localhost by default)
|
||||
* `port`: port to handle requests (80 by default)
|
||||
* `certfile`: path to SSL certificate file (will use HTTP by default if not provided)
|
||||
* `keyfile`: path to SSL private key file (will use HTTP by default if not provided)
|
||||
* `backend`: settings for sending requests to a backend
|
||||
* `host`: patroni REST API `listen` address
|
||||
* `port`: patroni REST API `listen` port
|
||||
* `scheme`: patroni REST API scheme (either `http` or `https`)
|
||||
* `insecure`: disable certificate checks on HTTPS requests
|
||||
* `cache`: settings for the caching system
|
||||
* `ttl`: time in second before response will be evinced
|
||||
* `interval`: time in second used by the internal cache loop to check for keys to remove
|
||||
|
||||
See [config.yml.example](config.yml.example) file for an example.
|
||||
|
||||
## Internals
|
||||
|
||||
* Frontend handles HTTP or HTTPS requests on "/master" and "/replica" routes available on Patroni API
|
||||
* Backend requests Patroni API using HTTP or HTTPS protocol and exposes state to frontend
|
||||
* Cache implements an in-memory key-value store to cache backend responses for some time
|
||||
|
||||
## Build
|
||||
|
||||
Run `./build.sh` script and enjoy!
|
1
VERSION
Normal file
1
VERSION
Normal file
|
@ -0,0 +1 @@
|
|||
1.0.0
|
19
build.sh
Executable file
19
build.sh
Executable file
|
@ -0,0 +1,19 @@
|
|||
#!/bin/bash
|
||||
BINARY=patroniglue
|
||||
VERSION=$(cat VERSION)
|
||||
BUILD_PATH=/tmp/${BINARY}-${VERSION}
|
||||
ldflags="-X main.AppVersion=${VERSION}"
|
||||
GOOS=linux
|
||||
GOARCH=amd64
|
||||
DEPENDENCIES="github.com/gorilla/mux gopkg.in/yaml.v2"
|
||||
|
||||
export GOOS
|
||||
export GOARCH
|
||||
|
||||
go get ${DEPENDENCIES}
|
||||
|
||||
go build -ldflags "$ldflags" -o ${BUILD_PATH}/${BINARY} src/*.go
|
||||
(cd ${BUILD_PATH} && tar czf ${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz ${BINARY})
|
||||
|
||||
echo "Archive created:"
|
||||
ls -l ${BUILD_PATH}/${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz
|
13
config.yml.example
Normal file
13
config.yml.example
Normal file
|
@ -0,0 +1,13 @@
|
|||
---
|
||||
frontend:
|
||||
host: 127.0.0.1
|
||||
port: 8443
|
||||
certfile: /path/to/certificate.pem
|
||||
keyfile: /pat/to/keyfile.key
|
||||
backend:
|
||||
host: 127.0.0.1
|
||||
port: 8008
|
||||
scheme: http
|
||||
cache:
|
||||
ttl: 1
|
||||
interval: 0.25
|
101
src/backend.go
Normal file
101
src/backend.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// Backend connects to a backend (https)
|
||||
// and forward requests from frontend
|
||||
type Backend interface {
|
||||
IsPrimary() (bool, error)
|
||||
IsReplica() (bool, error)
|
||||
}
|
||||
|
||||
// NewBackend creates a backend from a driver, a connection string
|
||||
// and an optional interval for caching values
|
||||
func NewBackend(config BackendConfig, cache Cache) Backend {
|
||||
if config.Host == "" {
|
||||
config.Host = "localhost"
|
||||
}
|
||||
if config.Port == 0 {
|
||||
config.Port = 80
|
||||
}
|
||||
if config.Scheme == "" {
|
||||
config.Scheme = "http"
|
||||
}
|
||||
|
||||
b := &HTTPBackend{
|
||||
host: config.Host,
|
||||
port: config.Port,
|
||||
scheme: config.Scheme,
|
||||
cache: cache,
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.Insecure},
|
||||
}
|
||||
b.client = &http.Client{Transport: tr}
|
||||
return b
|
||||
}
|
||||
|
||||
// HTTPBackend will request a backend with HTTP(s) protocol
|
||||
type HTTPBackend struct {
|
||||
cache Cache
|
||||
host string
|
||||
port int
|
||||
scheme string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func (b HTTPBackend) baseURL() string {
|
||||
return fmt.Sprintf("%s://%s:%d", b.scheme, b.host, b.port)
|
||||
}
|
||||
|
||||
// request search into the cache to find the given key
|
||||
// then eventually creates a HTTP/HTTPS request on backend and
|
||||
// cache response for further requests
|
||||
func (b HTTPBackend) request(key string) (bool, error) {
|
||||
state, err := b.cache.Get(key)
|
||||
if err != nil {
|
||||
Warning("could not get key %s from cache: %v", key, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if state == nil {
|
||||
url := b.baseURL() + "/" + key
|
||||
|
||||
Debug("GET %s", url)
|
||||
response, err := b.client.Get(url)
|
||||
if err != nil {
|
||||
Warning("could not request remote backend: %v", err)
|
||||
return false, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.StatusCode == http.StatusOK {
|
||||
state = true
|
||||
} else {
|
||||
state = false
|
||||
}
|
||||
|
||||
err = b.cache.Set(key, state)
|
||||
if err != nil {
|
||||
Warning("could not save %s key to cache: %v", key, err)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return state.(bool), nil
|
||||
}
|
||||
|
||||
// IsPrimary will call /master route on patroni API
|
||||
func (b HTTPBackend) IsPrimary() (bool, error) {
|
||||
return b.request("master")
|
||||
}
|
||||
|
||||
// IsReplica will call /replica route on patroni API
|
||||
func (b HTTPBackend) IsReplica() (bool, error) {
|
||||
return b.request("replica")
|
||||
}
|
94
src/cache.go
Normal file
94
src/cache.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Cache defines an interface to implement caching structure
|
||||
type Cache interface {
|
||||
Startup()
|
||||
Get(string) (interface{}, error)
|
||||
Set(string, interface{}) error
|
||||
}
|
||||
|
||||
// MemoryCache caches data in an in-memory key-value store with a ttl in seconds
|
||||
type MemoryCache struct {
|
||||
mutex sync.Mutex
|
||||
enabled bool
|
||||
datastore map[string]interface{}
|
||||
expireStore map[string]time.Time
|
||||
ttl float64
|
||||
interval float64
|
||||
}
|
||||
|
||||
// NewCache creates a Cache instance
|
||||
func NewCache(config CacheConfig) (Cache, error) {
|
||||
enabled := false
|
||||
if config.TTL > 0 {
|
||||
enabled = true
|
||||
}
|
||||
if config.Interval == 0 {
|
||||
config.Interval = 0.25
|
||||
}
|
||||
return &MemoryCache{
|
||||
ttl: config.TTL,
|
||||
interval: config.Interval,
|
||||
enabled: enabled,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Startup starts cache management threads
|
||||
func (c *MemoryCache) Startup() {
|
||||
Debug("starting memory cache")
|
||||
c.datastore = make(map[string]interface{})
|
||||
c.expireStore = make(map[string]time.Time)
|
||||
go c.expireThread()
|
||||
}
|
||||
|
||||
// expireThread flushes datastore every ttl seconds
|
||||
func (c *MemoryCache) expireThread() {
|
||||
Debug("starting cache expire thread")
|
||||
if c.enabled {
|
||||
for {
|
||||
c.mutex.Lock()
|
||||
for key := range c.expireStore {
|
||||
if time.Since(c.expireStore[key]).Seconds() > c.ttl {
|
||||
Debug("deleting key '%s' from cache", key)
|
||||
delete(c.datastore, key)
|
||||
delete(c.expireStore, key)
|
||||
}
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
time.Sleep(time.Duration(c.interval*1000) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
Debug("ending cache expire thread")
|
||||
}
|
||||
|
||||
// Get a value from cache datastore
|
||||
func (c *MemoryCache) Get(key string) (interface{}, error) {
|
||||
if !c.enabled {
|
||||
return nil, nil
|
||||
}
|
||||
value, ok := c.datastore[key]
|
||||
if ok {
|
||||
Debug("value for key '%s' found in cache", key)
|
||||
return value, nil
|
||||
}
|
||||
Debug("value for key '%s' not found in cache", key)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Set a value into cache datastore with a key
|
||||
func (c *MemoryCache) Set(key string, value interface{}) error {
|
||||
if !c.enabled {
|
||||
return nil
|
||||
}
|
||||
Debug("setting value for key '%s' in cache", key)
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
c.datastore[key] = value
|
||||
c.expireStore[key] = time.Now()
|
||||
return nil
|
||||
}
|
64
src/config.go
Normal file
64
src/config.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
// Config stores configuration
|
||||
type Config struct {
|
||||
File string
|
||||
Frontend FrontendConfig `yaml:"frontend"`
|
||||
Backend BackendConfig `yaml:"backend"`
|
||||
Cache CacheConfig `yaml:"cache"`
|
||||
}
|
||||
|
||||
// FrontendConfig for storing Frontend settings
|
||||
type FrontendConfig struct {
|
||||
Host string `yaml:"host"`
|
||||
Port int `yaml:"port"`
|
||||
Certfile string `yaml:"certfile"`
|
||||
Keyfile string `yaml:"keyfile"`
|
||||
LogFormat string `yaml:"logformat"`
|
||||
}
|
||||
|
||||
// BackendConfig for storing Backend settings
|
||||
type BackendConfig struct {
|
||||
Host string `yaml:"host"`
|
||||
Port int `yaml:"port"`
|
||||
Scheme string `yaml:"scheme"`
|
||||
Insecure bool `yaml:"insecure"`
|
||||
}
|
||||
|
||||
// CacheConfig for storing Cache settings
|
||||
type CacheConfig struct {
|
||||
TTL float64 `yaml:"ttl"`
|
||||
Interval float64 `yaml:"interval"`
|
||||
}
|
||||
|
||||
// NewConfig creates a Config
|
||||
func NewConfig() *Config {
|
||||
return &Config{}
|
||||
}
|
||||
|
||||
// ReadFile reads a configuration file and load settings to memory
|
||||
func (c *Config) ReadFile(file string) error {
|
||||
file, err := filepath.Abs(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
yamlFile, err := ioutil.ReadFile(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = yaml.Unmarshal(yamlFile, &c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
142
src/frontend.go
Normal file
142
src/frontend.go
Normal file
|
@ -0,0 +1,142 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// Frontend exposes statuses over HTTP(S)
|
||||
type Frontend struct {
|
||||
backend Backend
|
||||
host string
|
||||
port int
|
||||
certfile string
|
||||
keyfile string
|
||||
}
|
||||
|
||||
var backend Backend
|
||||
var logFormat string
|
||||
|
||||
// Start creates an HTTP server and listen
|
||||
func (f *Frontend) Start() error {
|
||||
Debug("creating router")
|
||||
r := mux.NewRouter()
|
||||
r.Use(loggingMiddleware)
|
||||
r.Use(headersMiddleware)
|
||||
|
||||
Debug("registering routes")
|
||||
r.HandleFunc("/health", HealthHandler).Methods("GET")
|
||||
r.HandleFunc("/master", PrimaryHandler).Methods("GET", "OPTIONS")
|
||||
r.HandleFunc("/replica", ReplicaHandler).Methods("GET", "OPTIONS")
|
||||
|
||||
Info("listening on %s", f)
|
||||
var err error
|
||||
if f.certfile != "" && f.keyfile != "" {
|
||||
err = http.ListenAndServeTLS(f.String(), f.certfile, f.keyfile, r)
|
||||
} else {
|
||||
err = http.ListenAndServe(f.String(), r)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Frontend) String() string {
|
||||
return fmt.Sprintf("%s:%d", f.host, f.port)
|
||||
}
|
||||
|
||||
// NewFrontend creates a Frontend
|
||||
func NewFrontend(config FrontendConfig, b Backend) (*Frontend, error) {
|
||||
backend = b
|
||||
logFormat = config.LogFormat
|
||||
return &Frontend{
|
||||
host: config.Host,
|
||||
port: config.Port,
|
||||
certfile: config.Certfile,
|
||||
keyfile: config.Keyfile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Log requests
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
Info(formatRequest(r, logFormat))
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// formatRequest replaces request placeholders for logging purpose
|
||||
func formatRequest(r *http.Request, format string) string {
|
||||
if format == "" {
|
||||
format = "%a - %m %U"
|
||||
}
|
||||
definitions := map[string]string{
|
||||
"%a": r.RemoteAddr,
|
||||
"%m": r.Method,
|
||||
"%U": r.RequestURI,
|
||||
}
|
||||
output := format
|
||||
|
||||
for placeholder, value := range definitions {
|
||||
output = strings.Replace(output, placeholder, value, -1)
|
||||
}
|
||||
|
||||
return output
|
||||
}
|
||||
|
||||
// Add headers
|
||||
func headersMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// HealthHandler returns frontend health status
|
||||
func HealthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
io.WriteString(w, `{"healthy": true}`)
|
||||
}
|
||||
|
||||
// PrimaryHandler exposes primary status
|
||||
func PrimaryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var message string
|
||||
var status int
|
||||
primary, err := backend.IsPrimary()
|
||||
if err != nil {
|
||||
message = fmt.Sprintf("{\"error\":\"%v\"}", err)
|
||||
status = http.StatusServiceUnavailable
|
||||
}
|
||||
message = fmt.Sprintf("{\"primary\":%t}", primary)
|
||||
status = http.StatusServiceUnavailable
|
||||
if primary {
|
||||
status = http.StatusOK
|
||||
}
|
||||
w.WriteHeader(status)
|
||||
io.WriteString(w, message)
|
||||
}
|
||||
|
||||
// ReplicaHandler exposes replica status
|
||||
func ReplicaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var message string
|
||||
var status int
|
||||
replica, err := backend.IsReplica()
|
||||
if err != nil {
|
||||
message = fmt.Sprintf("{\"error\":\"%v\"}", err)
|
||||
status = http.StatusServiceUnavailable
|
||||
}
|
||||
message = fmt.Sprintf("{\"replica\":%t}", replica)
|
||||
status = http.StatusServiceUnavailable
|
||||
if replica {
|
||||
status = http.StatusOK
|
||||
}
|
||||
w.WriteHeader(status)
|
||||
io.WriteString(w, message)
|
||||
}
|
74
src/log.go
Normal file
74
src/log.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
// LogLevel is the minimum level of log messages to print
|
||||
type LogLevel int
|
||||
|
||||
// Auto set log level
|
||||
const (
|
||||
DEBUG LogLevel = iota
|
||||
INFO
|
||||
WARNING
|
||||
ERROR
|
||||
FATAL
|
||||
)
|
||||
|
||||
var level = INFO
|
||||
|
||||
// SetLogLevel sets minimum log level to print
|
||||
func SetLogLevel(logLevel string) error {
|
||||
switch logLevel {
|
||||
case "FATAL":
|
||||
level = FATAL
|
||||
case "ERROR":
|
||||
level = ERROR
|
||||
case "WARNING":
|
||||
level = WARNING
|
||||
case "INFO":
|
||||
level = INFO
|
||||
case "DEBUG":
|
||||
level = DEBUG
|
||||
default:
|
||||
return fmt.Errorf("log level %s not allowed", logLevel)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Debug prints very verbose messages
|
||||
func Debug(message string, args ...interface{}) {
|
||||
if level <= DEBUG {
|
||||
log.Printf("DEBUG: "+message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Info prints informative messages
|
||||
func Info(message string, args ...interface{}) {
|
||||
if level <= INFO {
|
||||
log.Printf("INFO: "+message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Warning prints messages you should give attention
|
||||
func Warning(message string, args ...interface{}) {
|
||||
if level <= WARNING {
|
||||
log.Printf("WARNING: "+message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Error prints impacting messages
|
||||
func Error(message string, args ...interface{}) {
|
||||
if level <= ERROR {
|
||||
log.Printf("ERROR: "+message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Fatal prints a message and exit program
|
||||
func Fatal(message string, args ...interface{}) {
|
||||
if level <= FATAL {
|
||||
log.Fatalf("FATAL: "+message, args...)
|
||||
}
|
||||
}
|
78
src/main.go
Normal file
78
src/main.go
Normal file
|
@ -0,0 +1,78 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// AppName exposes application name globally
|
||||
var AppName = "patroniglue"
|
||||
|
||||
// AppVersion stores application version at compilation time
|
||||
var AppVersion string
|
||||
|
||||
func main() {
|
||||
|
||||
var err error
|
||||
config := NewConfig()
|
||||
|
||||
// Argument handling
|
||||
quiet := flag.Bool("quiet", false, "Quiet mode")
|
||||
verbose := flag.Bool("verbose", false, "Verbose mode")
|
||||
debug := flag.Bool("debug", false, "Debug mode")
|
||||
version := flag.Bool("version", false, "Print version")
|
||||
flag.StringVar(&config.File, "config", os.Getenv("HOME")+"/."+AppName+".yml", "Configuration file")
|
||||
flag.Parse()
|
||||
|
||||
// Print version and exit
|
||||
if *version {
|
||||
if AppVersion == "" {
|
||||
AppVersion = "unknown"
|
||||
}
|
||||
fmt.Println(AppVersion)
|
||||
return
|
||||
}
|
||||
|
||||
// Log level management
|
||||
if *debug {
|
||||
err = SetLogLevel("DEBUG")
|
||||
}
|
||||
if *verbose {
|
||||
err = SetLogLevel("INFO")
|
||||
}
|
||||
if *quiet {
|
||||
err = SetLogLevel("ERROR")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
Fatal("could not set log level: %v", err)
|
||||
}
|
||||
|
||||
// Read configuration file
|
||||
Debug("reading configuration file")
|
||||
err = config.ReadFile(config.File)
|
||||
if err != nil {
|
||||
Fatal("could not read configuration file: %v", err)
|
||||
}
|
||||
|
||||
// Cache management
|
||||
cache, err := NewCache(config.Cache)
|
||||
if err != nil {
|
||||
Fatal("could not create cache: %v", err)
|
||||
}
|
||||
cache.Startup()
|
||||
|
||||
// Backend management
|
||||
backend := NewBackend(config.Backend, cache)
|
||||
|
||||
// Frontend management
|
||||
frontend, err := NewFrontend(config.Frontend, backend)
|
||||
if err != nil {
|
||||
Fatal("could not create frontend: %v", err)
|
||||
}
|
||||
err = frontend.Start()
|
||||
if err != nil {
|
||||
Fatal("could not start frontend: %v", err)
|
||||
}
|
||||
}
|
Reference in a new issue