From 9818566369a691bb3ab1ed1a11caf7f36ffdea79 Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Fri, 8 Mar 2019 18:14:22 +0100 Subject: [PATCH] Initial release --- README.md | 48 +++++++++++++++ VERSION | 1 + build.sh | 19 ++++++ config.yml.example | 13 +++++ src/backend.go | 101 ++++++++++++++++++++++++++++++++ src/cache.go | 94 ++++++++++++++++++++++++++++++ src/config.go | 64 ++++++++++++++++++++ src/frontend.go | 142 +++++++++++++++++++++++++++++++++++++++++++++ src/log.go | 74 +++++++++++++++++++++++ src/main.go | 78 +++++++++++++++++++++++++ 10 files changed, 634 insertions(+) create mode 100644 README.md create mode 100644 VERSION create mode 100755 build.sh create mode 100644 config.yml.example create mode 100644 src/backend.go create mode 100644 src/cache.go create mode 100644 src/config.go create mode 100644 src/frontend.go create mode 100644 src/log.go create mode 100644 src/main.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..27ca776 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# patroniglue +> Handle and cache basic Patroni API checks + +[Patroni](https://github.com/zalando/patroni) uses the built-in Python HTTP server to expose database states. It's perfect to be used by a load balancer like HAProxy to achieve high-availability. But, sometimes, this interface freezes. There's an [open issue](https://github.com/zalando/patroni/issues/857) we are trying to close actively. As production doesn't wait, `patroniglue` was created to offload those checks and release pressure by adding a little response cache. + +## Usage +Start process using a configuration file: +``` +patroniglue -config config.yml +``` +Add more logging output: +``` +patroniglue -config config.yml -verbose +``` +Print usage: +``` +patroniglue -help +``` + +## Configuration + +Configuration file format is YAML. + +* `frontend`: settings to handle incoming requests + * `host`: address to handle requests (localhost by default) + * `port`: port to handle requests (80 by default) + * `certfile`: path to SSL certificate file (will use HTTP by default if not provided) + * `keyfile`: path to SSL private key file (will use HTTP by default if not provided) +* `backend`: settings for sending requests to a backend + * `host`: patroni REST API `listen` address + * `port`: patroni REST API `listen` port + * `scheme`: patroni REST API scheme (either `http` or `https`) + * `insecure`: disable certificate checks on HTTPS requests +* `cache`: settings for the caching system + * `ttl`: time in second before response will be evinced + * `interval`: time in second used by the internal cache loop to check for keys to remove + +See [config.yml.example](config.yml.example) file for an example. + +## Internals + +* Frontend handles HTTP or HTTPS requests on "/master" and "/replica" routes available on Patroni API +* Backend requests Patroni API using HTTP or HTTPS protocol and exposes state to frontend +* Cache implements an in-memory key-value store to cache backend responses for some time + +## Build + +Run `./build.sh` script and enjoy! \ No newline at end of file diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..afaf360 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0.0 \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..bd45391 --- /dev/null +++ b/build.sh @@ -0,0 +1,19 @@ +#!/bin/bash +BINARY=patroniglue +VERSION=$(cat VERSION) +BUILD_PATH=/tmp/${BINARY}-${VERSION} +ldflags="-X main.AppVersion=${VERSION}" +GOOS=linux +GOARCH=amd64 +DEPENDENCIES="github.com/gorilla/mux gopkg.in/yaml.v2" + +export GOOS +export GOARCH + +go get ${DEPENDENCIES} + +go build -ldflags "$ldflags" -o ${BUILD_PATH}/${BINARY} src/*.go +(cd ${BUILD_PATH} && tar czf ${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz ${BINARY}) + +echo "Archive created:" +ls -l ${BUILD_PATH}/${BINARY}-${VERSION}-${GOOS}-${GOARCH}.tar.gz diff --git a/config.yml.example b/config.yml.example new file mode 100644 index 0000000..a68a7ec --- /dev/null +++ b/config.yml.example @@ -0,0 +1,13 @@ +--- +frontend: + host: 127.0.0.1 + port: 8443 + certfile: /path/to/certificate.pem + keyfile: /pat/to/keyfile.key +backend: + host: 127.0.0.1 + port: 8008 + scheme: http +cache: + ttl: 1 + interval: 0.25 \ No newline at end of file diff --git a/src/backend.go b/src/backend.go new file mode 100644 index 0000000..60f0666 --- /dev/null +++ b/src/backend.go @@ -0,0 +1,101 @@ +package main + +import ( + "crypto/tls" + "fmt" + "net/http" +) + +// Backend connects to a backend (https) +// and forward requests from frontend +type Backend interface { + IsPrimary() (bool, error) + IsReplica() (bool, error) +} + +// NewBackend creates a backend from a driver, a connection string +// and an optional interval for caching values +func NewBackend(config BackendConfig, cache Cache) Backend { + if config.Host == "" { + config.Host = "localhost" + } + if config.Port == 0 { + config.Port = 80 + } + if config.Scheme == "" { + config.Scheme = "http" + } + + b := &HTTPBackend{ + host: config.Host, + port: config.Port, + scheme: config.Scheme, + cache: cache, + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.Insecure}, + } + b.client = &http.Client{Transport: tr} + return b +} + +// HTTPBackend will request a backend with HTTP(s) protocol +type HTTPBackend struct { + cache Cache + host string + port int + scheme string + client *http.Client +} + +func (b HTTPBackend) baseURL() string { + return fmt.Sprintf("%s://%s:%d", b.scheme, b.host, b.port) +} + +// request search into the cache to find the given key +// then eventually creates a HTTP/HTTPS request on backend and +// cache response for further requests +func (b HTTPBackend) request(key string) (bool, error) { + state, err := b.cache.Get(key) + if err != nil { + Warning("could not get key %s from cache: %v", key, err) + return false, err + } + + if state == nil { + url := b.baseURL() + "/" + key + + Debug("GET %s", url) + response, err := b.client.Get(url) + if err != nil { + Warning("could not request remote backend: %v", err) + return false, err + } + defer response.Body.Close() + + if response.StatusCode == http.StatusOK { + state = true + } else { + state = false + } + + err = b.cache.Set(key, state) + if err != nil { + Warning("could not save %s key to cache: %v", key, err) + return false, err + } + } + + return state.(bool), nil +} + +// IsPrimary will call /master route on patroni API +func (b HTTPBackend) IsPrimary() (bool, error) { + return b.request("master") +} + +// IsReplica will call /replica route on patroni API +func (b HTTPBackend) IsReplica() (bool, error) { + return b.request("replica") +} diff --git a/src/cache.go b/src/cache.go new file mode 100644 index 0000000..1472af8 --- /dev/null +++ b/src/cache.go @@ -0,0 +1,94 @@ +package main + +import ( + "sync" + "time" +) + +// Cache defines an interface to implement caching structure +type Cache interface { + Startup() + Get(string) (interface{}, error) + Set(string, interface{}) error +} + +// MemoryCache caches data in an in-memory key-value store with a ttl in seconds +type MemoryCache struct { + mutex sync.Mutex + enabled bool + datastore map[string]interface{} + expireStore map[string]time.Time + ttl float64 + interval float64 +} + +// NewCache creates a Cache instance +func NewCache(config CacheConfig) (Cache, error) { + enabled := false + if config.TTL > 0 { + enabled = true + } + if config.Interval == 0 { + config.Interval = 0.25 + } + return &MemoryCache{ + ttl: config.TTL, + interval: config.Interval, + enabled: enabled, + }, nil +} + +// Startup starts cache management threads +func (c *MemoryCache) Startup() { + Debug("starting memory cache") + c.datastore = make(map[string]interface{}) + c.expireStore = make(map[string]time.Time) + go c.expireThread() +} + +// expireThread flushes datastore every ttl seconds +func (c *MemoryCache) expireThread() { + Debug("starting cache expire thread") + if c.enabled { + for { + c.mutex.Lock() + for key := range c.expireStore { + if time.Since(c.expireStore[key]).Seconds() > c.ttl { + Debug("deleting key '%s' from cache", key) + delete(c.datastore, key) + delete(c.expireStore, key) + } + } + c.mutex.Unlock() + time.Sleep(time.Duration(c.interval*1000) * time.Millisecond) + } + } + Debug("ending cache expire thread") +} + +// Get a value from cache datastore +func (c *MemoryCache) Get(key string) (interface{}, error) { + if !c.enabled { + return nil, nil + } + value, ok := c.datastore[key] + if ok { + Debug("value for key '%s' found in cache", key) + return value, nil + } + Debug("value for key '%s' not found in cache", key) + return nil, nil +} + +// Set a value into cache datastore with a key +func (c *MemoryCache) Set(key string, value interface{}) error { + if !c.enabled { + return nil + } + Debug("setting value for key '%s' in cache", key) + c.mutex.Lock() + defer c.mutex.Unlock() + c.datastore[key] = value + c.expireStore[key] = time.Now() + return nil +} diff --git a/src/config.go b/src/config.go new file mode 100644 index 0000000..6be99bf --- /dev/null +++ b/src/config.go @@ -0,0 +1,64 @@ +package main + +import ( + "io/ioutil" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +// Config stores configuration +type Config struct { + File string + Frontend FrontendConfig `yaml:"frontend"` + Backend BackendConfig `yaml:"backend"` + Cache CacheConfig `yaml:"cache"` +} + +// FrontendConfig for storing Frontend settings +type FrontendConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Certfile string `yaml:"certfile"` + Keyfile string `yaml:"keyfile"` + LogFormat string `yaml:"logformat"` +} + +// BackendConfig for storing Backend settings +type BackendConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Scheme string `yaml:"scheme"` + Insecure bool `yaml:"insecure"` +} + +// CacheConfig for storing Cache settings +type CacheConfig struct { + TTL float64 `yaml:"ttl"` + Interval float64 `yaml:"interval"` +} + +// NewConfig creates a Config +func NewConfig() *Config { + return &Config{} +} + +// ReadFile reads a configuration file and load settings to memory +func (c *Config) ReadFile(file string) error { + file, err := filepath.Abs(file) + if err != nil { + return err + } + + yamlFile, err := ioutil.ReadFile(file) + if err != nil { + return err + } + + err = yaml.Unmarshal(yamlFile, &c) + if err != nil { + return err + } + + return nil +} diff --git a/src/frontend.go b/src/frontend.go new file mode 100644 index 0000000..2e2f303 --- /dev/null +++ b/src/frontend.go @@ -0,0 +1,142 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "strings" + + "github.com/gorilla/mux" +) + +// Frontend exposes statuses over HTTP(S) +type Frontend struct { + backend Backend + host string + port int + certfile string + keyfile string +} + +var backend Backend +var logFormat string + +// Start creates an HTTP server and listen +func (f *Frontend) Start() error { + Debug("creating router") + r := mux.NewRouter() + r.Use(loggingMiddleware) + r.Use(headersMiddleware) + + Debug("registering routes") + r.HandleFunc("/health", HealthHandler).Methods("GET") + r.HandleFunc("/master", PrimaryHandler).Methods("GET", "OPTIONS") + r.HandleFunc("/replica", ReplicaHandler).Methods("GET", "OPTIONS") + + Info("listening on %s", f) + var err error + if f.certfile != "" && f.keyfile != "" { + err = http.ListenAndServeTLS(f.String(), f.certfile, f.keyfile, r) + } else { + err = http.ListenAndServe(f.String(), r) + } + + if err != nil { + return err + } + + return nil +} + +func (f *Frontend) String() string { + return fmt.Sprintf("%s:%d", f.host, f.port) +} + +// NewFrontend creates a Frontend +func NewFrontend(config FrontendConfig, b Backend) (*Frontend, error) { + backend = b + logFormat = config.LogFormat + return &Frontend{ + host: config.Host, + port: config.Port, + certfile: config.Certfile, + keyfile: config.Keyfile, + }, nil +} + +// Log requests +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Info(formatRequest(r, logFormat)) + next.ServeHTTP(w, r) + }) +} + +// formatRequest replaces request placeholders for logging purpose +func formatRequest(r *http.Request, format string) string { + if format == "" { + format = "%a - %m %U" + } + definitions := map[string]string{ + "%a": r.RemoteAddr, + "%m": r.Method, + "%U": r.RequestURI, + } + output := format + + for placeholder, value := range definitions { + output = strings.Replace(output, placeholder, value, -1) + } + + return output +} + +// Add headers +func headersMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + next.ServeHTTP(w, r) + }) +} + +// HealthHandler returns frontend health status +func HealthHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `{"healthy": true}`) +} + +// PrimaryHandler exposes primary status +func PrimaryHandler(w http.ResponseWriter, r *http.Request) { + var message string + var status int + primary, err := backend.IsPrimary() + if err != nil { + message = fmt.Sprintf("{\"error\":\"%v\"}", err) + status = http.StatusServiceUnavailable + } + message = fmt.Sprintf("{\"primary\":%t}", primary) + status = http.StatusServiceUnavailable + if primary { + status = http.StatusOK + } + w.WriteHeader(status) + io.WriteString(w, message) +} + +// ReplicaHandler exposes replica status +func ReplicaHandler(w http.ResponseWriter, r *http.Request) { + var message string + var status int + replica, err := backend.IsReplica() + if err != nil { + message = fmt.Sprintf("{\"error\":\"%v\"}", err) + status = http.StatusServiceUnavailable + } + message = fmt.Sprintf("{\"replica\":%t}", replica) + status = http.StatusServiceUnavailable + if replica { + status = http.StatusOK + } + w.WriteHeader(status) + io.WriteString(w, message) +} diff --git a/src/log.go b/src/log.go new file mode 100644 index 0000000..971c3e8 --- /dev/null +++ b/src/log.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" +) + +// LogLevel is the minimum level of log messages to print +type LogLevel int + +// Auto set log level +const ( + DEBUG LogLevel = iota + INFO + WARNING + ERROR + FATAL +) + +var level = INFO + +// SetLogLevel sets minimum log level to print +func SetLogLevel(logLevel string) error { + switch logLevel { + case "FATAL": + level = FATAL + case "ERROR": + level = ERROR + case "WARNING": + level = WARNING + case "INFO": + level = INFO + case "DEBUG": + level = DEBUG + default: + return fmt.Errorf("log level %s not allowed", logLevel) + } + return nil +} + +// Debug prints very verbose messages +func Debug(message string, args ...interface{}) { + if level <= DEBUG { + log.Printf("DEBUG: "+message, args...) + } +} + +// Info prints informative messages +func Info(message string, args ...interface{}) { + if level <= INFO { + log.Printf("INFO: "+message, args...) + } +} + +// Warning prints messages you should give attention +func Warning(message string, args ...interface{}) { + if level <= WARNING { + log.Printf("WARNING: "+message, args...) + } +} + +// Error prints impacting messages +func Error(message string, args ...interface{}) { + if level <= ERROR { + log.Printf("ERROR: "+message, args...) + } +} + +// Fatal prints a message and exit program +func Fatal(message string, args ...interface{}) { + if level <= FATAL { + log.Fatalf("FATAL: "+message, args...) + } +} diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..7af1f78 --- /dev/null +++ b/src/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +// AppName exposes application name globally +var AppName = "patroniglue" + +// AppVersion stores application version at compilation time +var AppVersion string + +func main() { + + var err error + config := NewConfig() + + // Argument handling + quiet := flag.Bool("quiet", false, "Quiet mode") + verbose := flag.Bool("verbose", false, "Verbose mode") + debug := flag.Bool("debug", false, "Debug mode") + version := flag.Bool("version", false, "Print version") + flag.StringVar(&config.File, "config", os.Getenv("HOME")+"/."+AppName+".yml", "Configuration file") + flag.Parse() + + // Print version and exit + if *version { + if AppVersion == "" { + AppVersion = "unknown" + } + fmt.Println(AppVersion) + return + } + + // Log level management + if *debug { + err = SetLogLevel("DEBUG") + } + if *verbose { + err = SetLogLevel("INFO") + } + if *quiet { + err = SetLogLevel("ERROR") + } + + if err != nil { + Fatal("could not set log level: %v", err) + } + + // Read configuration file + Debug("reading configuration file") + err = config.ReadFile(config.File) + if err != nil { + Fatal("could not read configuration file: %v", err) + } + + // Cache management + cache, err := NewCache(config.Cache) + if err != nil { + Fatal("could not create cache: %v", err) + } + cache.Startup() + + // Backend management + backend := NewBackend(config.Backend, cache) + + // Frontend management + frontend, err := NewFrontend(config.Frontend, backend) + if err != nil { + Fatal("could not create frontend: %v", err) + } + err = frontend.Start() + if err != nil { + Fatal("could not start frontend: %v", err) + } +}