Browse Source

first test build with direct HLS streaming from a samba share

develop
forest 2 months ago
parent
commit
e67f711db5
11 changed files with 352 additions and 16 deletions
  1. +1
    -0
      .vscode/settings.json
  2. +9
    -7
      config/defaults.go
  3. +19
    -0
      controllers/admin/config.go
  4. +7
    -5
      controllers/admin/serverConfig.go
  5. +9
    -4
      core/core.go
  6. +17
    -0
      core/data/config.go
  7. +277
    -0
      core/directhls/directhls.go
  8. +1
    -0
      go.mod
  9. +6
    -0
      go.sum
  10. +3
    -0
      openapi.yaml
  11. +3
    -0
      router/router.go

+ 1
- 0
.vscode/settings.json View File

@ -16,6 +16,7 @@
"nolint",
"preact",
"rtmpserverport",
"directhlsinputurl",
"sqlite",
"videojs"
]

+ 9
- 7
config/defaults.go View File

@ -12,10 +12,11 @@ type Defaults struct {
Tags []string
PageBodyContent string
DatabaseFilePath string
WebServerPort int
RTMPServerPort int
StreamKey string
DatabaseFilePath string
WebServerPort int
RTMPServerPort int
DirectHLSInputURL string
StreamKey string
YPEnabled bool
YPServer string
@ -45,9 +46,10 @@ func GetDefaults() Defaults {
YPEnabled: false,
YPServer: "https://directory.owncast.online",
WebServerPort: 8080,
RTMPServerPort: 1935,
StreamKey: "abc123",
WebServerPort: 8080,
RTMPServerPort: 1935,
DirectHLSInputURL: "",
StreamKey: "abc123",
StreamVariants: []models.StreamOutputVariant{
{


+ 19
- 0
controllers/admin/config.go View File

@ -320,6 +320,25 @@ func SetRTMPServerPort(w http.ResponseWriter, r *http.Request) {
controllers.WriteSimpleResponse(w, true, "rtmp port set")
}
// SetDirectHLSInputURL will handle the web config request to set the inbound RTMP port.
func SetDirectHLSInputURL(w http.ResponseWriter, r *http.Request) {
if !requirePOST(w, r) {
return
}
configValue, success := getValueFromRequest(w, r)
if !success {
return
}
if err := data.SetDirectHLSInputURL(configValue.Value.(string)); err != nil {
controllers.WriteSimpleResponse(w, false, err.Error())
return
}
controllers.WriteSimpleResponse(w, true, "direct hls input url set")
}
// SetServerURL will handle the web config request to set the full server URL.
func SetServerURL(w http.ResponseWriter, r *http.Request) {
if !requirePOST(w, r) {


+ 7
- 5
controllers/admin/serverConfig.go View File

@ -43,11 +43,12 @@ func GetServerConfig(w http.ResponseWriter, r *http.Request) {
NSFW: data.GetNSFW(),
CustomStyles: data.GetCustomStyles(),
},
FFmpegPath: ffmpeg,
StreamKey: data.GetStreamKey(),
WebServerPort: config.WebServerPort,
RTMPServerPort: data.GetRTMPPortNumber(),
ChatDisabled: data.GetChatDisabled(),
FFmpegPath: ffmpeg,
StreamKey: data.GetStreamKey(),
WebServerPort: config.WebServerPort,
RTMPServerPort: data.GetRTMPPortNumber(),
DirectHLSInputURL: data.GetDirectHLSInputURL(),
ChatDisabled: data.GetChatDisabled(),
VideoSettings: videoSettings{
VideoQualityVariants: videoQualityVariants,
LatencyLevel: data.GetStreamLatencyLevel().Level,
@ -76,6 +77,7 @@ type serverConfigAdminResponse struct {
StreamKey string `json:"streamKey"`
WebServerPort int `json:"webServerPort"`
RTMPServerPort int `json:"rtmpServerPort"`
DirectHLSInputURL string `json:"directHLSInputURL"`
S3 models.S3 `json:"s3"`
VideoSettings videoSettings `json:"videoSettings"`
LatencyLevel int `json:"latencyLevel"`


+ 9
- 4
core/core.go View File

@ -10,6 +10,7 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/directhls"
"github.com/owncast/owncast/core/rtmp"
"github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/models"
@ -66,11 +67,15 @@ func Start() error {
chat.Setup(ChatListenerImpl{})
// start the rtmp server
go rtmp.Start(setStreamAsConnected, setBroadcaster)
if data.GetDirectHLSInputURL() != "" {
go directhls.Start(setStreamAsConnected, setBroadcaster)
rtmpPort := data.GetRTMPPortNumber()
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
log.Infof("Polling for HLS segments at %s.", data.GetDirectHLSInputURL())
} else {
go rtmp.Start(setStreamAsConnected, setBroadcaster)
log.Infof("RTMP is accepting inbound streams on port %d.", data.GetRTMPPortNumber())
}
return nil
}


+ 17
- 0
core/data/config.go View File

@ -23,6 +23,7 @@ const serverNameKey = "server_name"
const serverURLKey = "server_url"
const httpPortNumberKey = "http_port_number"
const rtmpPortNumberKey = "rtmp_port_number"
const directHLSInputURLKey = "direct_hls_input_url"
const serverMetadataTagsKey = "server_metadata_tags"
const directoryEnabledKey = "directory_enabled"
const directoryRegistrationKeyKey = "directory_registration_key"
@ -211,6 +212,22 @@ func SetRTMPPortNumber(port float64) error {
return _datastore.SetNumber(rtmpPortNumberKey, port)
}
// GetDirectHLSInputURL will return the server RTMP port.
func GetDirectHLSInputURL() string {
url, err := _datastore.GetString(directHLSInputURLKey)
if err != nil {
log.Traceln(directHLSInputURLKey, err)
return config.GetDefaults().DirectHLSInputURL
}
return url
}
// SetDirectHLSInputURL will set the server RTMP port.
func SetDirectHLSInputURL(url string) error {
return _datastore.SetString(directHLSInputURLKey, url)
}
// GetServerMetadataTags will return the metadata tags.
func GetServerMetadataTags() []string {
tagsString, err := _datastore.GetString(serverMetadataTagsKey)


+ 277
- 0
core/directhls/directhls.go View File

@ -0,0 +1,277 @@
package directhls
import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"time"
"github.com/hirochachacha/go-smb2"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
)
var _sambaHostPort string
var _sambaFolderPath string
var _sambaShareName string
var _setStreamAsConnected func()
var _setBroadcaster func(models.Broadcaster)
// Start starts the directhls service, polling for HLS segments at the DirectHLSInputURL.
func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster)) {
_setStreamAsConnected = setStreamAsConnected
_setBroadcaster = setBroadcaster
directHLSInputURLString := data.GetDirectHLSInputURL()
directHLSInputURL, err := url.Parse(directHLSInputURLString)
if err != nil {
log.Panicf(
"owncast can't stream because directHLSInputURL '%s' could not be parsed as a URL: %s",
directHLSInputURLString, err,
)
}
// TODO introduce retry logic, dont just panic when connections fail
// TODO support file:// and http:// protocols as well.
if strings.ToLower(directHLSInputURL.Scheme) == "smb" || strings.ToLower(directHLSInputURL.Scheme) == "samba" {
connectPort := "445"
if directHLSInputURL.Port() != "" {
connectPort = directHLSInputURL.Port()
}
pathPartsRaw := strings.Split(directHLSInputURL.Path, "/")
pathParts := []string{}
for _, part := range pathPartsRaw {
if strings.TrimSpace(part) != "" {
pathParts = append(pathParts, part)
}
}
if len(pathParts) > 0 {
_sambaShareName = pathParts[0]
}
if len(pathParts) > 1 {
_sambaFolderPath = path.Join(pathParts[1:]...)
}
_sambaHostPort = fmt.Sprintf("%s:%s", directHLSInputURL.Host, connectPort)
tcpConnection, err := net.Dial("tcp", _sambaHostPort)
if err != nil {
log.Panicf(
"owncast can't stream because it could not dial '%s' over TCP for direct HLS input: %s",
_sambaHostPort, err,
)
}
defer tcpConnection.Close()
smbDialer := &smb2.Dialer{
Initiator: &smb2.NTLMInitiator{
User: "guest",
Password: "",
},
}
smbConnection, err := smbDialer.Dial(tcpConnection)
if err != nil {
log.Panicf(
"owncast can't stream because it couldn't initiate a samba session with '%s' for direct HLS input. (TCP connection established, but samba protocol failed): %s",
_sambaHostPort, err,
)
}
defer smbConnection.Logoff()
if _sambaShareName == "" {
shareNames, err := smbConnection.ListSharenames()
if err != nil {
log.Panicf(
"owncast can't stream because it connected to the samba server '%s' for direct HLS input, but hit an error trying to list the samba share names. You could try putting the share name into the directHLSInputURL",
_sambaHostPort,
)
}
for _, name := range shareNames {
if (strings.ToLower(name) == "public" && _sambaShareName == "") || strings.ToLower(name) == "owncast" {
_sambaShareName = name
}
}
}
if _sambaShareName == "" {
log.Panicf(
"owncast can't stream because it didn't find a samba share named 'public' or 'owncast' on '%s' for direct HLS input",
_sambaHostPort,
)
}
sambaShare, err := smbConnection.Mount(_sambaShareName)
if err != nil {
var allShareNames string
shareNames, err := smbConnection.ListSharenames()
if err == nil {
allShareNames = strings.Join(shareNames, ",\n")
} else {
allShareNames = fmt.Sprintf("<error occurred obtaining share names: %s>", err)
}
log.Panicf(
"owncast can't stream because it couldn't mount the samba share named '%s' on '%s' for direct HLS input: %s. The existing shares are named: \n\n%s",
_sambaShareName, _sambaHostPort, err, allShareNames,
)
}
defer sambaShare.Umount()
if _sambaFolderPath == "" {
_sambaFolderPath = "/"
}
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
if err != nil {
log.Panicf(
"owncast can't stream because it couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
)
}
log.Printf("mounted the folder '%s' on the samba share '%s' on server '%s' for direct HLS input", _sambaFolderPath, _sambaShareName, _sambaHostPort)
getMostRecentModTime := func(fileInfos []os.FileInfo) time.Time {
var mostRecentTime time.Time
for _, fileInfo := range fileInfos {
modTime := fileInfo.ModTime()
if modTime.After(mostRecentTime) {
mostRecentTime = modTime
}
}
return mostRecentTime
}
mostRecentModTime := getMostRecentModTime(fileInfos)
hlsSegmentServiceClient := http.Client{
Timeout: time.Second * 5,
}
for {
time.Sleep(time.Second * 5)
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
if err != nil {
log.Errorf(
"couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
)
}
modTimeNow := getMostRecentModTime(fileInfos)
if modTimeNow.After(mostRecentModTime) {
if _setStreamAsConnected != nil {
_setStreamAsConnected()
}
// TODO how much of this can be pulled from the HLS video stream ?
// TODO how much of this is necessary for the system to work?
// broadcaster := models.Broadcaster{
// RemoteAddr: _sambaHostPort,
// Time: time.Now(),
// StreamDetails: models.InboundStreamDetails{
// Width: data.Width,
// Height: data.Height,
// VideoBitrate: int(data.VideoBitrate),
// VideoCodec: getVideoCodec(data.VideoCodec),
// VideoFramerate: data.VideoFramerate,
// AudioBitrate: int(data.AudioBitrate),
// AudioCodec: getAudioCodec(data.AudioCodec),
// Encoder: data.Encoder,
// VideoOnly: data.AudioCodec == nil,
// },
// }
// _setBroadcaster(broadcaster)
for {
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
if err != nil {
log.Errorf(
"couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
)
}
waitGroup := new(sync.WaitGroup)
for _, fileInfo := range fileInfos {
if fileInfo.ModTime().After(mostRecentModTime) {
waitGroup.Add(1)
go (func(fileInfo os.FileInfo) {
defer waitGroup.Done()
file, err := sambaShare.Open(path.Join(_sambaFolderPath, fileInfo.Name()))
if err != nil {
log.Errorf(
"samba client unable to open file '%s/%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, fileInfo.Name(), _sambaShareName, _sambaHostPort, err,
)
return
}
defer file.Close()
hlsUploadRequest, err := http.NewRequest(
"PUT",
fmt.Sprintf("http://127.0.0.1:%d/%s", config.InternalHLSListenerPort, fileInfo.Name()),
file,
)
if err != nil {
log.Errorf(
"unable to create local server HTTP PUT request for HLS segment '%s': %s. Stream wont work if this error keeps happening",
fileInfo.Name(), err,
)
return
}
response, err := hlsSegmentServiceClient.Do(hlsUploadRequest)
if err != nil {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' failed: %s. Stream wont work if this error keeps happening",
fileInfo.Name(), err,
)
return
}
if response.StatusCode != http.StatusOK {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' returned HTTP %d: %s. Stream wont work if this error keeps happening",
fileInfo.Name(), response.StatusCode, response.Status,
)
}
})(fileInfo)
}
}
timeBeforeSync := time.Now()
waitGroup.Wait()
syncDuration := time.Since(timeBeforeSync)
mostRecentModTime = getMostRecentModTime(fileInfos)
if syncDuration < time.Second {
time.Sleep(time.Second - syncDuration)
}
}
}
}
} else {
log.Errorf(
"owncast can't stream because directHLSInputURL '%s' is using the protocol %s which isn't supported yet. Try samba:// or smb:// instead.",
directHLSInputURLString, directHLSInputURL.Scheme,
)
return
}
}

+ 1
- 0
go.mod View File

@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go v1.38.44
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/grafov/m3u8 v0.11.1
github.com/hirochachacha/go-smb2 v1.0.10 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/lestrrat-go/strftime v1.0.4 // indirect


+ 6
- 0
go.sum View File

@ -9,6 +9,8 @@ github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/geoffgarside/ber v1.1.0 h1:qTmFG4jJbwiSzSXoNJeHcOprVzZ8Ulde2Rrrifu5U9w=
github.com/geoffgarside/ber v1.1.0/go.mod h1:jVPKeCbj6MvQZhwLYsGwaGI52oUorHoHKNecGT85ZCc=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/gobuffalo/here v0.6.0 h1:hYrd0a6gDmWxBM4TnrGw8mQg24iSVoIkHEk7FodQcBI=
@ -17,6 +19,8 @@ github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY=
github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c=
github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA=
github.com/grafov/m3u8 v0.11.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
github.com/hirochachacha/go-smb2 v1.0.10 h1:fiSNyMOOlWzfdTVk6VtvxfDGqhjNDI2iYZjd/jdtmhk=
github.com/hirochachacha/go-smb2 v1.0.10/go.mod h1:8F1A4d5EZzrGu5R7PU163UcMRDJQl4FtcxjBfsY8TZE=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
@ -85,6 +89,8 @@ github.com/yuin/goldmark v1.3.7/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=


+ 3
- 0
openapi.yaml View File

@ -634,6 +634,9 @@ paths:
rtmpServerPort:
type: integer
description: The port the inbound RTMP broadcast should be sent to.
directhlsinputurl:
type: string
description: An experimental optional alternative input type to use with OBS (all video encoding done on streaming PC, owncast server just serves files and thats it)
s3:
$ref: "#/components/schemas/S3"
videoSettings:


+ 3
- 0
router/router.go View File

@ -175,6 +175,9 @@ func Start() error {
// Server rtmp port
http.HandleFunc("/api/admin/config/rtmpserverport", middleware.RequireAdminAuth(admin.SetRTMPServerPort))
// Server direct hls input url
http.HandleFunc("/api/admin/config/directhlsinputurl", middleware.RequireAdminAuth(admin.SetDirectHLSInputURL))
// Is server marked as NSFW
http.HandleFunc("/api/admin/config/nsfw", middleware.RequireAdminAuth(admin.SetNSFW))


Loading…
Cancel
Save