Browse Source

username and password support for hls + finishing up retry stuff

develop
forest 2 months ago
parent
commit
226eb89201
1 changed files with 212 additions and 196 deletions
  1. +212
    -196
      core/directhls/directhls.go

+ 212
- 196
core/directhls/directhls.go View File

@ -82,259 +82,275 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster),
_sambaHostPort = fmt.Sprintf("%s:%s", directHLSInputURL.Host, connectPort)
for {
(func() {
tcpConnection, err := net.Dial("tcp", _sambaHostPort)
if err != nil {
log.Panicf(
"owncast can't stream because it could not dial '%s' over TCP for direct HLS input: %s",
_sambaHostPort, err,
)
}
defer tcpConnection.Close()
tcpConnection, err := net.Dial("tcp", _sambaHostPort)
if err != nil {
log.Errorf(
"owncast can't stream because it could not dial '%s' over TCP for direct HLS input: %s",
_sambaHostPort, err,
)
return
}
defer tcpConnection.Close()
smbDialer := &smb2.Dialer{
Initiator: &smb2.NTLMInitiator{
User: "guest",
Password: "",
},
}
// https://www.windowscentral.com/how-create-guest-account-windows-10
// TODO make username and password configurable for windows
smbConnection, err := smbDialer.Dial(tcpConnection)
if err != nil {
log.Panicf(
"owncast can't stream because it couldn't initiate a samba session with '%s' for direct HLS input. (TCP connection established, but samba protocol failed): %s",
_sambaHostPort, err,
)
}
defer smbConnection.Logoff()
username := directHLSInputURL.User.Username()
password, hasPassword := directHLSInputURL.User.Password()
if !hasPassword {
password = ""
}
// empty username is not supported by this smb2 client
if username == "" {
username = "guest"
}
smbDialer := &smb2.Dialer{
Initiator: &smb2.NTLMInitiator{
User: username,
Password: password,
},
}
if _sambaShareName == "" {
shareNames, err := smbConnection.ListSharenames()
smbConnection, err := smbDialer.Dial(tcpConnection)
if err != nil {
log.Panicf(
"owncast can't stream because it connected to the samba server '%s' for direct HLS input, but hit an error trying to list the samba share names. You could try putting the share name into the directHLSInputURL",
_sambaHostPort,
log.Errorf(
"owncast can't stream because it couldn't initiate a samba session with '%s' for direct HLS input. (TCP connection established, but samba protocol failed): %s",
_sambaHostPort, err,
)
return
}
for _, name := range shareNames {
if (strings.ToLower(name) == "public" && _sambaShareName == "") || strings.ToLower(name) == "owncast" {
_sambaShareName = name
defer smbConnection.Logoff()
if _sambaShareName == "" {
shareNames, err := smbConnection.ListSharenames()
if err != nil {
log.Errorf(
"owncast can't stream because it connected to the samba server '%s' for direct HLS input, but hit an error trying to list the samba share names. You could try putting the share name into the directHLSInputURL",
_sambaHostPort,
)
return
}
for _, name := range shareNames {
if (strings.ToLower(name) == "public" && _sambaShareName == "") || strings.ToLower(name) == "owncast" {
_sambaShareName = name
}
}
}
}
if _sambaShareName == "" {
log.Panicf(
"owncast can't stream because it didn't find a samba share named 'public' or 'owncast' on '%s' for direct HLS input",
_sambaHostPort,
)
}
sambaShare, err := smbConnection.Mount(_sambaShareName)
if err != nil {
var allShareNames string
shareNames, err := smbConnection.ListSharenames()
if err == nil {
allShareNames = strings.Join(shareNames, ",\n")
} else {
allShareNames = fmt.Sprintf("<error occurred obtaining share names: %s>", err)
if _sambaShareName == "" {
log.Errorf(
"owncast can't stream because it didn't find a samba share named 'public' or 'owncast' on '%s' for direct HLS input",
_sambaHostPort,
)
return
}
log.Errorf(
"owncast can't stream because it couldn't mount the samba share named '%s' on '%s' for direct HLS input: %s. The existing shares are named: \n\n%s",
_sambaShareName, _sambaHostPort, err, allShareNames,
)
continue
}
// this samba client doesnt like leading slashes in paths.
_sambaFolderPath = strings.TrimPrefix(_sambaFolderPath, "/")
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
if err != nil {
log.Errorf(
"owncast can't stream because it couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
)
sambaShare.Umount()
continue
}
log.Printf("mounted the folder '%s' on the samba share '%s' on server '%s' for direct HLS input", _sambaFolderPath, _sambaShareName, _sambaHostPort)
var mostRecentModTime time.Time
for _, fileInfo := range fileInfos {
modTime := fileInfo.ModTime()
if modTime.After(mostRecentModTime) {
mostRecentModTime = modTime
sambaShare, err := smbConnection.Mount(_sambaShareName)
if err != nil {
var allShareNames string
shareNames, err := smbConnection.ListSharenames()
if err == nil {
allShareNames = strings.Join(shareNames, ",\n")
} else {
allShareNames = fmt.Sprintf("<error occurred obtaining share names: %s>", err)
}
log.Errorf(
"owncast can't stream because it couldn't mount the samba share named '%s' on '%s' for direct HLS input: %s. The existing shares are named: \n\n%s",
_sambaShareName, _sambaHostPort, err, allShareNames,
)
return
}
}
defer sambaShare.Umount()
for {
time.Sleep(time.Second * 2)
// this samba client doesnt like leading slashes in paths.
_sambaFolderPath = strings.TrimPrefix(_sambaFolderPath, "/")
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
modTimeNow := mostRecentModTime
fileInfos, err := sambaShare.ReadDir(_sambaFolderPath)
if err != nil {
log.Errorf(
"couldn't find the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
"owncast can't stream because it couldn't find the folder '%s' on the samba share named '%s' on '%s' for direct HLS input: %s",
_sambaFolderPath, _sambaShareName, _sambaHostPort, err,
)
break
} else {
modTimeNow = fileInfo.ModTime()
return
}
if modTimeNow.After(mostRecentModTime) {
if _setStreamAsConnected != nil {
_setStreamAsConnected()
}
log.Printf("mounted the folder '%s' on the samba share '%s' on server '%s' for direct HLS input", _sambaFolderPath, _sambaShareName, _sambaHostPort)
// TODO how much of this can be pulled from the HLS video stream ?
// TODO how much of this is necessary for the system to work?
_setBroadcaster(models.Broadcaster{
RemoteAddr: _sambaHostPort,
Time: time.Now(),
StreamDetails: models.InboundStreamDetails{},
})
streamStartedTimeBuffer := make([]byte, 8)
binary.BigEndian.PutUint64(streamStartedTimeBuffer, uint64(time.Now().Unix()))
truncatedStreamStartedTimeBuffer := []byte{}
for _, bite := range streamStartedTimeBuffer {
if bite != byte(255) && bite != byte(0) {
truncatedStreamStartedTimeBuffer = append(truncatedStreamStartedTimeBuffer, bite)
}
var mostRecentModTime time.Time
for _, fileInfo := range fileInfos {
modTime := fileInfo.ModTime()
if modTime.After(mostRecentModTime) {
mostRecentModTime = modTime
}
_streamStartedTime = base64.RawURLEncoding.EncodeToString(truncatedStreamStartedTimeBuffer)
syncedSegments := map[string]bool{}
log.Printf(
"now streaming files from '%s' on the samba share '%s' on server '%s' for direct HLS!! (stream id: %s)",
_sambaFolderPath, _sambaShareName, _sambaHostPort, _streamStartedTime,
)
}
//
serverTimeLastTimeFilesWereUpdated := time.Now()
for {
for {
time.Sleep(time.Second * 2)
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
modTimeNow := mostRecentModTime
if err != nil {
log.Errorf(
"couldn't find the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
break
} else {
modTimeNow = fileInfo.ModTime()
}
// if it has been 10 seconds without any updates to files in the share, we should consider the stream disconnected.
if time.Now().After(serverTimeLastTimeFilesWereUpdated.Add(time.Second * 10)) {
_setStreamAsDisconnected()
break
if modTimeNow.After(mostRecentModTime) {
if _setStreamAsConnected != nil {
_setStreamAsConnected()
}
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
if err != nil {
log.Errorf(
"couldn't stat the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
_setStreamAsDisconnected()
break
// TODO how much of this can be pulled from the HLS video stream ?
// TODO how much of this is necessary for the system to work?
_setBroadcaster(models.Broadcaster{
RemoteAddr: _sambaHostPort,
Time: time.Now(),
StreamDetails: models.InboundStreamDetails{},
})
streamStartedTimeBuffer := make([]byte, 8)
binary.BigEndian.PutUint64(streamStartedTimeBuffer, uint64(time.Now().Unix()))
truncatedStreamStartedTimeBuffer := []byte{}
for _, bite := range streamStartedTimeBuffer {
if bite != byte(255) && bite != byte(0) {
truncatedStreamStartedTimeBuffer = append(truncatedStreamStartedTimeBuffer, bite)
}
}
_streamStartedTime = base64.RawURLEncoding.EncodeToString(truncatedStreamStartedTimeBuffer)
if fileInfo.ModTime().After(mostRecentModTime) {
syncedSegments := map[string]bool{}
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
log.Printf(
"now streaming files from '%s' on the samba share '%s' on server '%s' for direct HLS!! (stream id: %s)",
_sambaFolderPath, _sambaShareName, _sambaHostPort, _streamStartedTime,
)
//
serverTimeLastTimeFilesWereUpdated := time.Now()
for {
// if it has been 10 seconds without any updates to files in the share, we should consider the stream disconnected.
if time.Now().After(serverTimeLastTimeFilesWereUpdated.Add(time.Second * 10)) {
_setStreamAsDisconnected()
break
}
playlistBytes, err := io.ReadAll(file)
fileName := path.Join(_sambaFolderPath, "stream.m3u8")
fileInfo, err := sambaShare.Stat(fileName)
if err != nil {
log.Errorf(
"couldn't read the file '%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
"couldn't stat the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
_setStreamAsDisconnected()
break
}
lines := strings.Split(string(playlistBytes), "\n")
outputLines := make([]string, len(lines))
mentionedSegments := []string{}
for i, line := range lines {
matches := _streamSegmentRegex.FindStringSubmatch(line)
if matches != nil && len(matches) == 3 {
outputLines[i] = fmt.Sprintf("%s-%s-%s.ts", matches[1], _streamStartedTime, matches[2])
mentionedSegments = append(mentionedSegments, line)
} else {
outputLines[i] = line
}
}
waitGroup1 := new(sync.WaitGroup)
if fileInfo.ModTime().After(mostRecentModTime) {
for _, fileName := range mentionedSegments {
fileName := fileName
if !syncedSegments[fileName] {
syncedSegments[fileName] = true
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
_setStreamAsDisconnected()
break
}
waitGroup1.Add(1)
go (func(fileName string, waitGroup1 *sync.WaitGroup) {
playlistBytes, err := io.ReadAll(file)
if err != nil {
log.Errorf(
"couldn't read the file '%s' on share '%s' on server '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
_setStreamAsDisconnected()
break
}
lines := strings.Split(string(playlistBytes), "\n")
outputLines := make([]string, len(lines))
mentionedSegments := []string{}
for i, line := range lines {
matches := _streamSegmentRegex.FindStringSubmatch(line)
if matches != nil && len(matches) == 3 {
outputLines[i] = fmt.Sprintf("%s-%s-%s.ts", matches[1], _streamStartedTime, matches[2])
mentionedSegments = append(mentionedSegments, line)
} else {
outputLines[i] = line
}
}
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
waitGroup1 := new(sync.WaitGroup)
for _, fileName := range mentionedSegments {
fileName := fileName
if !syncedSegments[fileName] {
syncedSegments[fileName] = true
waitGroup1.Add(1)
go (func(fileName string, waitGroup1 *sync.WaitGroup) {
file, err := sambaShare.Open(fileName)
if err != nil {
log.Errorf(
"couldn't open the file '%s' on the samba share named '%s' on '%s' for direct HLS input: %s. Stream wont work if this error keeps happening",
fileName, _sambaShareName, _sambaHostPort, err,
)
waitGroup1.Done()
return
}
postFileToHLSHandler(
_streamSegmentRegex.ReplaceAllString(file.Name(), fmt.Sprintf("$1-%s-$2.ts", _streamStartedTime)),
file,
waitGroup1,
)
waitGroup1.Done()
return
}
postFileToHLSHandler(
_streamSegmentRegex.ReplaceAllString(file.Name(), fmt.Sprintf("$1-%s-$2.ts", _streamStartedTime)),
file,
waitGroup1,
)
})(fileName, waitGroup1)
})(fileName, waitGroup1)
}
}
}
timeBeforeSync := time.Now()
// First wait for the new video segments to be fully uploaded.
waitGroup1.Wait()
timeBeforeSync := time.Now()
// First wait for the new video segments to be fully uploaded.
waitGroup1.Wait()
waitGroup2 := new(sync.WaitGroup)
waitGroup2 := new(sync.WaitGroup)
waitGroup2.Add(1)
go postFileToHLSHandler(
"stream.m3u8",
bytes.NewBuffer([]byte(strings.Join(outputLines, "\n"))),
waitGroup2,
)
waitGroup2.Add(1)
go postFileToHLSHandler(
"stream.m3u8",
bytes.NewBuffer([]byte(strings.Join(outputLines, "\n"))),
waitGroup2,
)
// Then wait for the HLS playlist to be uploaded.
waitGroup2.Wait()
// Then wait for the HLS playlist to be uploaded.
waitGroup2.Wait()
syncDuration := time.Since(timeBeforeSync)
syncDuration := time.Since(timeBeforeSync)
mostRecentModTime = fileInfo.ModTime()
serverTimeLastTimeFilesWereUpdated = time.Now()
mostRecentModTime = fileInfo.ModTime()
serverTimeLastTimeFilesWereUpdated = time.Now()
if syncDuration < time.Second {
time.Sleep(time.Second - syncDuration)
} else {
log.Errorf("syncing hls segments took %s, expected it to take less than 1 second.", syncDuration)
}
if syncDuration < time.Second {
time.Sleep(time.Second - syncDuration)
} else {
log.Errorf("syncing hls segments took %s, expected it to take less than 1 second.", syncDuration)
// the playlist file was not updated... wait & poll again.
time.Sleep(time.Second)
}
} else {
// the playlist file was not updated... wait & poll again.
time.Sleep(time.Second)
}
}
time.Sleep(time.Second * 4)
}
time.Sleep(time.Second * 4)
}
sambaShare.Umount()
})() // end of imediately invoked function expression used with defer
time.Sleep(_retryInterval)
if _retryInterval < time.Minute {


Loading…
Cancel
Save