Browse Source

optimize samba client: only grab the playlist and new segments

instead of scanning the entire shared folder.
develop
forest 2 months ago
parent
commit
67a8d4be8b
3 changed files with 139 additions and 87 deletions
  1. +1
    -0
      Dockerfile
  2. +137
    -87
      core/directhls/directhls.go
  3. +1
    -0
      nsswitch.conf

+ 1
- 0
Dockerfile View File

@ -12,6 +12,7 @@ RUN apk add --no-cache ffmpeg ffmpeg-libs
WORKDIR /app
COPY webroot /app/webroot
COPY static /app/static
COPY nsswitch.conf /etc/nsswitch.conf
COPY --from=build /build/owncast /app/owncast
EXPOSE 8080 1935


+ 137
- 87
core/directhls/directhls.go View File

@ -9,7 +9,6 @@ import (
"net"
"net/http"
"net/url"
"os"
"path"
"regexp"
"strings"
@ -29,6 +28,7 @@ var _sambaFolderPath string
var _sambaShareName string
var _streamSegmentRegex *regexp.Regexp
var _streamStartedTime string
var _hlsSegmentServiceClient http.Client
var _setStreamAsConnected func()
var _setStreamAsDisconnected func()
@ -40,6 +40,9 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster),
_setStreamAsDisconnected = setStreamAsDisconnected
_setBroadcaster = setBroadcaster
_streamSegmentRegex = regexp.MustCompile(`(stream)[_-]?([^#\s]+)\.ts`)
_hlsSegmentServiceClient = http.Client{
Timeout: time.Second * 5,
}
directHLSInputURLString := data.GetDirectHLSInputURL()
@ -152,22 +155,29 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster),
log.Printf("mounted the folder '%s' on the samba share '%s' on server '%s' for direct HLS input", _sambaFolderPath, _sambaShareName, _sambaHostPort)
mostRecentModTime := getMostRecentModTime(fileInfos)
hlsSegmentServiceClient := http.Client{
Timeout: time.Second * 5,
var mostRecentModTime time.Time
for _, fileInfo := range fileInfos {
modTime := fileInfo.ModTime()
if modTime.After(mostRecentModTime) {
mostRecentModTime = modTime
}
}
for {
time.Sleep(time.Second * 2)
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
modTimeNow := mostRecentModTime
if err != nil {
log.Errorf(
"couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
"couldn't find the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
} else {
modTimeNow = fileInfo.ModTime()
}
modTimeNow := getMostRecentModTime(fileInfos)
if modTimeNow.After(mostRecentModTime) {
if _setStreamAsConnected != nil {
_setStreamAsConnected()
@ -192,103 +202,121 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster),
}
_streamStartedTime = base64.RawURLEncoding.EncodeToString(truncatedStreamStartedTimeBuffer)
log.Printf("now streaming files from '%s' on the samba share '%s' on server '%s' for direct HLS!!", _sambaFolderPath, _sambaShareName, _sambaHostPort)
syncedSegments := map[string]bool{}
// TODO call SetStreamAsDisconnected() when the files stop getting updated for a couple seconds.
log.Printf(
"now streaming files from '%s' on the samba share '%s' on server '%s' for direct HLS!! (stream id: %s)",
_sambaFolderPath, _sambaShareName, _sambaHostPort, _streamStartedTime,
)
//
serverTimeLastTimeFilesWereUpdated := time.Now()
for {
// if it has been 10 seconds without any updates to files in the share, we should consider the stream disconnected.
if time.Now().After(mostRecentModTime.Add(time.Second * 15)) {
if time.Now().After(serverTimeLastTimeFilesWereUpdated.Add(time.Second * 10)) {
_setStreamAsDisconnected()
break
}
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
if err != nil {
log.Errorf(
"couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
"couldn't stat the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
}
waitGroup := new(sync.WaitGroup)
if fileInfo.ModTime().After(mostRecentModTime) {
for _, fileInfo := range fileInfos {
if fileInfo.ModTime().After(mostRecentModTime) {
waitGroup.Add(1)
go (func(fileInfo os.FileInfo) {
defer waitGroup.Done()
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
}
file, err := sambaShare.Open(path.Join(_sambaFolderPath, fileInfo.Name()))
if err != nil {
log.Errorf(
"samba client unable to open file '%s/%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, fileInfo.Name(), _sambaShareName, _sambaHostPort, err,
)
return
}
defer file.Close()
playlistBytes, err := io.ReadAll(file)
if err != nil {
log.Errorf(
"couldn't read the file '%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
return
}
lines := strings.Split(string(playlistBytes), "\n")
outputLines := make([]string, len(lines))
mentionedSegments := []string{}
for i, line := range lines {
matches := _streamSegmentRegex.FindStringSubmatch(line)
if matches != nil && len(matches) == 3 {
outputLines[i] = fmt.Sprintf("%s-%s-%s.ts", matches[1], _streamStartedTime, matches[2])
mentionedSegments = append(mentionedSegments, line)
} else {
outputLines[i] = line
}
}
waitGroup1 := new(sync.WaitGroup)
var dataToSend io.Reader
if strings.HasSuffix(fileInfo.Name(), ".m3u8") {
playlistBytes, err := io.ReadAll(file)
for _, fileName := range mentionedSegments {
fileName := fileName
if !syncedSegments[fileName] {
syncedSegments[fileName] = true
waitGroup1.Add(1)
go (func(fileName string, waitGroup1 *sync.WaitGroup) {
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"samba client unable to read file '%s/%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
_sambaFolderPath, fileInfo.Name(), _sambaShareName, _sambaHostPort, err,
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
waitGroup1.Done()
return
}
dataToSend = bytes.NewBuffer(
_streamSegmentRegex.ReplaceAll(playlistBytes, []byte(fmt.Sprintf("$1-%s-$2.ts", _streamStartedTime))),
)
} else {
dataToSend = file
}
fileName := _streamSegmentRegex.ReplaceAllString(fileInfo.Name(), fmt.Sprintf("$1-%s-$2.ts", _streamStartedTime))
// TODO I have hardcoded stream ID 0 here...
hlsUploadRequest, err := http.NewRequest(
"PUT",
fmt.Sprintf("http://127.0.0.1:%s/0/%s", config.InternalHLSListenerPort, fileName),
dataToSend,
)
if err != nil {
log.Errorf(
"unable to create local server HTTP PUT request for HLS segment '%s': %s. Stream wont work if this error keeps happening",
fileInfo.Name(), err,
)
return
}
response, err := hlsSegmentServiceClient.Do(hlsUploadRequest)
if err != nil {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' failed: %s. Stream wont work if this error keeps happening",
fileInfo.Name(), err,
postFileToHLSHandler(
_streamSegmentRegex.ReplaceAllString(file.Name(), fmt.Sprintf("$1-%s-$2.ts", _streamStartedTime)),
file,
waitGroup1,
)
return
}
if response.StatusCode != http.StatusOK {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' returned HTTP %d: %s. Stream wont work if this error keeps happening",
fileInfo.Name(), response.StatusCode, response.Status,
)
}
})(fileInfo)
})(fileName, waitGroup1)
}
}
}
timeBeforeSync := time.Now()
waitGroup.Wait()
syncDuration := time.Since(timeBeforeSync)
timeBeforeSync := time.Now()
// First wait for the new video segments to be fully uploaded.
waitGroup1.Wait()
mostRecentModTime = getMostRecentModTime(fileInfos)
waitGroup2 := new(sync.WaitGroup)
waitGroup2.Add(1)
go postFileToHLSHandler(
"stream.m3u8",
bytes.NewBuffer([]byte(strings.Join(outputLines, "\n"))),
waitGroup2,
)
if syncDuration < time.Second {
time.Sleep(time.Second - syncDuration)
// Then wait for the HLS playlist to be uploaded.
waitGroup2.Wait()
syncDuration := time.Since(timeBeforeSync)
mostRecentModTime = fileInfo.ModTime()
serverTimeLastTimeFilesWereUpdated = time.Now()
if syncDuration < time.Second {
time.Sleep(time.Second - syncDuration)
} else {
log.Errorf("syncing hls segments took %s, expected it to take less than 1 second.", syncDuration)
}
} else {
// the playlist file was not updated... wait & poll again.
time.Sleep(time.Second)
}
}
}
@ -305,13 +333,35 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster),
}
}
func getMostRecentModTime(fileInfos []os.FileInfo) time.Time {
var mostRecentTime time.Time
for _, fileInfo := range fileInfos {
modTime := fileInfo.ModTime()
if modTime.After(mostRecentTime) {
mostRecentTime = modTime
}
func postFileToHLSHandler(fileName string, dataToSend io.Reader, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
log.Traceln("HTTP PUT", fmt.Sprintf("http://127.0.0.1:%s/0/%s", config.InternalHLSListenerPort, fileName))
hlsUploadRequest, err := http.NewRequest(
"PUT",
fmt.Sprintf("http://127.0.0.1:%s/0/%s", config.InternalHLSListenerPort, fileName),
dataToSend,
)
if err != nil {
log.Errorf(
"unable to create local server HTTP PUT request for HLS segment '%s': %s. Stream wont work if this error keeps happening",
fileName, err,
)
return
}
response, err := _hlsSegmentServiceClient.Do(hlsUploadRequest)
if err != nil {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' failed: %s. Stream wont work if this error keeps happening",
fileName, err,
)
return
}
if response.StatusCode != http.StatusOK {
log.Errorf(
"local server HTTP PUT request for HLS segment '%s' returned HTTP %d: %s. Stream wont work if this error keeps happening",
fileName, response.StatusCode, response.Status,
)
}
return mostRecentTime
}

+ 1
- 0
nsswitch.conf View File

@ -0,0 +1 @@
hosts: files dns

Loading…
Cancel
Save