Feature(API): logs and traffic support websocket

This commit is contained in:
gVisor bot 2019-07-12 15:44:12 +08:00
parent f027c3e540
commit 19494fe383
2 changed files with 69 additions and 19 deletions

View file

@ -1,6 +1,7 @@
package route package route
import ( import (
"bytes"
"encoding/json" "encoding/json"
"net/http" "net/http"
"strings" "strings"
@ -12,6 +13,7 @@ import (
"github.com/go-chi/chi" "github.com/go-chi/chi"
"github.com/go-chi/cors" "github.com/go-chi/cors"
"github.com/go-chi/render" "github.com/go-chi/render"
"github.com/gorilla/websocket"
) )
var ( var (
@ -19,6 +21,8 @@ var (
serverAddr = "" serverAddr = ""
uiPath = "" uiPath = ""
upgrader = websocket.Upgrader{}
) )
type Traffic struct { type Traffic struct {
@ -47,15 +51,12 @@ func Start(addr string, secret string) {
MaxAge: 300, MaxAge: 300,
}) })
root := chi.NewRouter().With(jsonContentType)
root.Get("/traffic", traffic)
root.Get("/logs", getLogs)
r.Get("/", hello) r.Get("/", hello)
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {
r.Use(cors.Handler, authentication) r.Use(cors.Handler, authentication)
r.Mount("/", root) r.Get("/logs", getLogs)
r.Get("/traffic", traffic)
r.Mount("/configs", configRouter()) r.Mount("/configs", configRouter())
r.Mount("/proxies", proxyRouter()) r.Mount("/proxies", proxyRouter())
r.Mount("/rules", ruleRouter()) r.Mount("/rules", ruleRouter())
@ -78,14 +79,6 @@ func Start(addr string, secret string) {
} }
} }
func jsonContentType(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
next.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
func authentication(next http.Handler) http.Handler { func authentication(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) { fn := func(w http.ResponseWriter, r *http.Request) {
header := r.Header.Get("Authorization") header := r.Header.Get("Authorization")
@ -113,19 +106,44 @@ func hello(w http.ResponseWriter, r *http.Request) {
} }
func traffic(w http.ResponseWriter, r *http.Request) { func traffic(w http.ResponseWriter, r *http.Request) {
var wsConn *websocket.Conn
if websocket.IsWebSocketUpgrade(r) {
var err error
wsConn, err = upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
}
if wsConn == nil {
w.Header().Set("Content-Type", "application/json")
render.Status(r, http.StatusOK) render.Status(r, http.StatusOK)
}
tick := time.NewTicker(time.Second) tick := time.NewTicker(time.Second)
t := T.Instance().Traffic() t := T.Instance().Traffic()
buf := &bytes.Buffer{}
var err error
for range tick.C { for range tick.C {
buf.Reset()
up, down := t.Now() up, down := t.Now()
if err := json.NewEncoder(w).Encode(Traffic{ if err := json.NewEncoder(buf).Encode(Traffic{
Up: up, Up: up,
Down: down, Down: down,
}); err != nil { }); err != nil {
break break
} }
if wsConn == nil {
_, err = w.Write(buf.Bytes())
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} else {
err = wsConn.WriteMessage(websocket.TextMessage, buf.Bytes())
}
if err != nil {
break
}
} }
} }
@ -147,20 +165,47 @@ func getLogs(w http.ResponseWriter, r *http.Request) {
return return
} }
sub := log.Subscribe() var wsConn *websocket.Conn
if websocket.IsWebSocketUpgrade(r) {
var err error
wsConn, err = upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
}
if wsConn == nil {
w.Header().Set("Content-Type", "application/json")
render.Status(r, http.StatusOK) render.Status(r, http.StatusOK)
}
sub := log.Subscribe()
defer log.UnSubscribe(sub)
buf := &bytes.Buffer{}
var err error
for elm := range sub { for elm := range sub {
buf.Reset()
log := elm.(*log.Event) log := elm.(*log.Event)
if log.LogLevel < level { if log.LogLevel < level {
continue continue
} }
if err := json.NewEncoder(w).Encode(Log{ if err := json.NewEncoder(buf).Encode(Log{
Type: log.Type(), Type: log.Type(),
Payload: log.Payload, Payload: log.Payload,
}); err != nil { }); err != nil {
break break
} }
if wsConn == nil {
_, err = w.Write(buf.Bytes())
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} else {
err = wsConn.WriteMessage(websocket.TextMessage, buf.Bytes())
}
if err != nil {
break
}
} }
} }

View file

@ -62,6 +62,11 @@ func Subscribe() observable.Subscription {
return sub return sub
} }
func UnSubscribe(sub observable.Subscription) {
source.UnSubscribe(sub)
return
}
func Level() LogLevel { func Level() LogLevel {
return level return level
} }