2022-05-13 13:26:45 -04:00
package monitor
import (
"strconv"
2022-06-23 16:13:58 -04:00
"github.com/datarhei/core/v16/monitor/metric"
"github.com/datarhei/core/v16/restream"
2022-05-13 13:26:45 -04:00
)
type restreamCollector struct {
prefix string
r restream . Restreamer
restreamProcessDescr * metric . Description
restreamProcessStatesDescr * metric . Description
restreamProcessIODescr * metric . Description
restreamStatesDescr * metric . Description
}
func NewRestreamCollector ( r restream . Restreamer ) metric . Collector {
c := & restreamCollector {
prefix : "restream" ,
r : r ,
}
2022-09-08 07:50:53 -04:00
c . restreamProcessDescr = metric . NewDesc ( "restream_process" , "Current process values by name" , [ ] string { "processid" , "state" , "order" , "name" } )
c . restreamProcessStatesDescr = metric . NewDesc ( "restream_process_states" , "Current process state" , [ ] string { "processid" , "state" } )
c . restreamProcessIODescr = metric . NewDesc ( "restream_io" , "Current process IO values by name" , [ ] string { "processid" , "type" , "id" , "address" , "index" , "stream" , "media" , "name" } )
2023-04-11 08:52:27 -04:00
c . restreamStatesDescr = metric . NewDesc ( "restream_state" , "Summarized current process states" , [ ] string { "state" } )
2022-05-13 13:26:45 -04:00
return c
}
func ( c * restreamCollector ) Prefix ( ) string {
return c . prefix
}
func ( c * restreamCollector ) Describe ( ) [ ] * metric . Description {
return [ ] * metric . Description {
c . restreamProcessDescr ,
c . restreamProcessStatesDescr ,
c . restreamProcessIODescr ,
c . restreamStatesDescr ,
}
}
func ( c * restreamCollector ) Collect ( ) metric . Metrics {
metrics := metric . NewMetrics ( )
value := float64 ( 0 )
states := map [ string ] float64 {
"failed" : 0 ,
"finished" : 0 ,
"finishing" : 0 ,
"killed" : 0 ,
"running" : 0 ,
"starting" : 0 ,
}
2022-08-17 00:55:44 -04:00
ids := c . r . GetProcessIDs ( "" , "" )
2022-05-13 13:26:45 -04:00
for _ , id := range ids {
state , _ := c . r . GetProcessState ( id )
if state == nil {
continue
}
proc , _ := c . r . GetProcess ( id )
if proc == nil {
continue
}
states [ state . State ] ++
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Frame ) , id , state . State , state . Order , "frame" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . FPS ) , id , state . State , state . Order , "fps" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Speed ) , id , state . State , state . Order , "speed" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , state . Progress . Quantizer , id , state . State , state . Order , "q" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Size ) , id , state . State , state . Order , "size" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , state . Progress . Time , id , state . State , state . Order , "time" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Drop ) , id , state . State , state . Order , "drop" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Dup ) , id , state . State , state . Order , "dup" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Progress . Packet ) , id , state . State , state . Order , "packet" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , state . Progress . Bitrate , id , state . State , state . Order , "bitrate" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , state . CPU , id , state . State , state . Order , "cpu" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( state . Memory ) , id , state . State , state . Order , "memory" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , state . Duration , id , state . State , state . Order , "uptime" ) )
if proc . Config != nil {
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , proc . Config . LimitCPU , id , state . State , state . Order , "cpu_limit" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessDescr , float64 ( proc . Config . LimitMemory ) , id , state . State , state . Order , "memory_limit" ) )
}
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Failed ) , id , "failed" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Finished ) , id , "finished" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Finishing ) , id , "finishing" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Killed ) , id , "killed" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Running ) , id , "running" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessStatesDescr , float64 ( state . States . Starting ) , id , "starting" ) )
for i := range state . Progress . Input {
io := & state . Progress . Input [ i ]
index := strconv . FormatUint ( io . Index , 10 )
stream := strconv . FormatUint ( io . Stream , 10 )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Frame ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "frame" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . FPS ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "fps" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Packet ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "packet" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . PPS ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "pps" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Size ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "size" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Bitrate ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "bitrate" ) )
if io . AVstream != nil {
a := io . AVstream
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( a . Queue ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_queue" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( a . Dup ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_dup" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( a . Drop ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_drop" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( a . Enc ) , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_enc" ) )
value = 0
if a . Looping {
value = 1
}
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , value , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_looping" ) )
value = 0
if a . Duplicating {
value = 1
}
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , value , id , "input" , io . ID , io . Address , index , stream , io . Type , "avstream_duplicating" ) )
}
}
for i := range state . Progress . Output {
io := & state . Progress . Output [ i ]
index := strconv . FormatUint ( io . Index , 10 )
stream := strconv . FormatUint ( io . Stream , 10 )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Frame ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "frame" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . FPS ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "fps" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Packet ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "packet" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . PPS ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "pps" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Size ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "size" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Bitrate ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "bitrate" ) )
metrics . Add ( metric . NewValue ( c . restreamProcessIODescr , float64 ( io . Quantizer ) , id , "output" , io . ID , io . Address , index , stream , io . Type , "q" ) )
}
}
for state , value := range states {
metrics . Add ( metric . NewValue ( c . restreamStatesDescr , value , state ) )
}
return metrics
}
func ( c * restreamCollector ) Stop ( ) { }