2015-10-27

Communication between Powershell script and Jobs

I had an interesting task of collecting and displaying some runtime metrics during the benchmark. Since data was clearly divided between OS and processes, I opted for separate background jobs in which case no task will stall the other. However, this means I had to establish communication between main script code and jobs which is tricky in Powershell (see this bug for example). Immediately, I thought of using named pipes or files (as I do have small, ever changing set of data). Further investigation revealed 4 ways of accomplishing this:
  1. Using files
    • pros:
      • Tried and tested; works everywhere.
      • Intuitive.
      • File management routines are mature.
    • cons:
      • Resource lock/leak possible.
      • Not really fit for big amount of data.
      • Local.
  2. Using named pipes
    • pros:
      • Tried and tested; works everywhere.
      • Network.
    • cons:
      • Complicated to get right.
      • Blocking. Pipe has to get consumed before proceeding.
      • Async. Even more complicated to manage.
      • One should ensure unique pipe name.
      • No easy way to do cleanup if things go wrong.
  3. Using custom (Engine)Events
    • pros:
      • "Proper" Windows way.
      • No "weird" code in script.
    • cons:
      • Hard to control; not fit for all the scenarios.
  4. Using UDP/IP
    • pros:
      • Fast.
      • Fit for purpose (packet loss is acceptable).
    • cons:
      • Getting free ephemeral port.
      • Needs additional transport for initial Port negotiation.

Using files:

       
$job1 = Start-Job -ScriptBlock {
...
    #Permanent value.
    $job1out = (dir Env:\TEMP).Value + '\job1out.txt'
    $job1in = (dir Env:\TEMP).Value + '\job1in.txt'
    if (Test-Path $job1in) { #Ready to go. }

    do {
        #Do the work, pipe output to file:
...  
            Select-Object -First 20 | FT * -Auto 1> $job1out
}

$job2 = Start-Job -ScriptBlock {
    #Permanent values.
    $job2out = (dir Env:\TEMP).Value + '\job2out.txt'
    $job2in = (dir Env:\TEMP).Value + '\job2in.txt'
    if (Test-Path $job2in) { #Ready to go. }

    do {
        #Do the work
...  
        #Utilize fast FS classes to output the result:
 $stream = [System.IO.StreamWriter] ($job2out)
        $stream.WriteLine("")
        $stream.close()
... }
}

#Consolidate in main script code:
$job1out = (dir Env:\TEMP).Value + '\job1out.txt'
$job2out = (dir Env:\TEMP).Value + '\job2out.txt'
do {
    $l = Get-Content $job1out -ErrorAction SilentlyContinue
    $tc = Get-Item $job1out -ErrorAction SilentlyContinue
 #Make sure to display only fresh data.
    if (($l) -and ($tc) -and ($tc.LastWriteTime -gt $lastTimeHeader)) {
        $lastTimeHeader = Get-Date
    } else { $l = $null }

    $l1 = Get-Content $job2out -ErrorAction SilentlyContinue
    $tc = Get-Item $job2out -ErrorAction SilentlyContinue
 #Make sure to display only fresh data.
    if (($l1) -and ($tc) -and ($tc.LastWriteTime -gt $lastTimeProc)) {
        $lastTimeProc = Get-Date
    } else { $l1 = $null }
...
}
#Do the cleanup:
    Stop-Job -Job $job1
    $null = Receive-Job $job1
    Remove-Job $job1

    Stop-Job -Job $job2
    $null = Receive-Job $job2
    Remove-Job $job2
    
Gotcha: Background job is not aware of session settings (say, $PSScriptRoot) thus you need to utilize machine-wide variables, such as Env:TEMP.

Using named pipes (unfit for my particular use-case):

       
#Create 2 pipe servers; one for each background job.
$pipeH = new-object System.IO.Pipes.NamedPipeServerStream 'pipe1','In', 1, "Message"
$srH = new-object System.IO.StreamReader $pipeH
#--2nd pipe has 2-way communication
$pipeT = new-object System.IO.Pipes.NamedPipeServerStream 'pipe2','InOut', 1, "Message"
$srT = new-object System.IO.StreamReader $pipeT
$swT = new-object System.IO.StreamWriter $pipeT
#--
#Start Job1:
$job1 = Start-Job -ScriptBlock {
...
    #Permanent value.
    $pipeH1 = new-object System.IO.Pipes.NamedPipeClientStream '.', 'pipe1','Out'
    $pipeH1.Connect()
    $swH = new-object System.IO.StreamWriter $pipeH1
    $swH.AutoFlush = $true

    do {
        #Do the work, pipe output to StreamWriter:
...  
        $swH.WriteLine($ln0)
        $swH.WriteLine("")

}

#In main code, wait for traffic to start:
$pipeH.WaitForConnection()
cls
#Say we read 11 lines.
$l = $srH.ReadLine()
if ($l -ne $null) {
  $l
  for ($i = 1; $i -le 10; $i++)
  { 
    $l = $srH.ReadLine()
    if ($l -ne $null) {
      $l
    }      
  } 
}

#Start Job2:
$job2 = Start-Job -ScriptBlock {
    #Permanent values.
    $pipeT = new-object System.IO.Pipes.NamedPipeClientStream '.', 'pipe2','InOut'
    $pipeT.Connect()
    $srT = new-object System.IO.StreamReader $pipeT
    $srT.AutoFlush = $true
    $swT = new-object System.IO.StreamWriter $pipeT

    do {
        #Do the work, pipe output to StreamWriter:
...  
            Select-Object -First 20 | FT * -Auto 1> $swT
...
            #Check communication:
            if (($tmp = $srT.ReadLine()) -ne $null) {...}
   }
}

#Finish setting things up:
$swT.AutoFlush = $true
$pipeT.WaitForConnection()
#Say we read 27 lines from 2nd job.
$l = $srT.ReadLine()
if ($l -ne $null) {
  $l
  for ($i = 1; $i -le 26; $i++)
  { 
    $l = $srT.ReadLine()
    if ($l -ne $null) {
      $l
    }      
  } 
}

#And we send some info to 2nd job:
$swT.WriteLine('Something')

#You can read in loop too but make sure to allow for flow control between processes:
while (($tmp= $srT.ReadLine()) -ne 'something') 
{
  ...
}

#Do the cleanup:
    Stop-Job -Job $job1
    $null = Receive-Job $job1
    Remove-Job $job1

    Stop-Job -Job $job2
    $null = Receive-Job $job2
    Remove-Job $job2

    $pipeH.Close()
    $pipeH.Dispose()
    $srH.Close()
    $srH.Dispose()
    $pipeT.Close()
    $pipeT.Dispose()
    $srT.Close()
    $srT.Dispose()
    $swT.Close()
    $swT.Dispose()

   
Gotcha: NamedPipeServer/ClientStream have many constructors. Please check Server and Client documentation.
Good read on subject:Georg Begerow, MSDN
Gotcha: You can work without streams completely (ie. just with pipe object):
       
$enc = [system.Text.Encoding]::Default
$msg = $enc.GetBytes("Message");
$pipeSort.Write($msg, 0, $msg.Length)
$pipeSort.WaitForPipeDrain()
$pipeSort.Flush()
--
do
{
    $SortBy += $pipeT.ReadByte().ToChar()
}
while (!($pipeT.IsMessageComplete))
$pipeT.Flush()
   
Gotcha: I have found no way for this to work in my particular case (Async is out of question). Ie. Read/Peek... is always blocking thus preventing repeated updates. I guess this could be worked around by disposing and re-creating the client pipe but that's suboptimal.

Using custom (Engine)Events:

       
#Start Job1:
$job1 = Start-Job -ScriptBlock {
...
    Register-EngineEvent -SourceIdentifier Job1Message -Forward
    do {
        #Do the work, raise Event:
...  
        $null = New-Event -SourceIdentifier Job1Message -MessageData $your_result
    }

}

#Start Job2:
$job2 = Start-Job -ScriptBlock {
    Register-EngineEvent -SourceIdentifier Job2Message -Forward
    do {
        #Do the work, raise Event:
...  
        $null = New-Event -SourceIdentifier Job2Message -MessageData $your_result
    }
}

#In main code, I want to process results synchronously, 
#thus a bit weird approach instead of just using -Action {} scriptblock:
do {
    $OldErrorActionPreference = $ErrorActionPreference
    $ErrorActionPreference = "SilentlyContinue"
    $tmp = ''
    $tmp = (Get-Event -SourceIdentifier Job1Message).MessageData | Select -Last 1
    if ($tmp.Length) {
        #Do work with $tmp
    }
    $tmp = ''
    $tmp = (Get-Event -SourceIdentifier Job2Message).MessageData | Select -Last 1
    if ($tmp.Length) {
        #Do work with $tmp
    }
    $ErrorActionPreference = $OldErrorActionPreference
}

#and the cleanup:
    $OldErrorActionPreference = $ErrorActionPreference
    $ErrorActionPreference = "SilentlyContinue"
    Remove-Event -SourceIdentifier "Job1Message"
    Remove-Event -SourceIdentifier "Job2Message"
    $ErrorActionPreference = $OldErrorActionPreference

    $job1 | Stop-Job -PassThru| Remove-Job
    $job2 | Stop-Job -PassThru| Remove-Job

   
Gotcha: Unfortunately, I was unable to make Event propagate from main script to background job using same approach which makes me think there are more things to learn here and/or there is a bug in PS v3 regarding events processing.
Gotcha: Register-EngineEvent ... -Action {} will start another (child)background job which will run in different runspace thus the options for communication are running scarce.


Using UDP/IP:

Based on PowerTip.
       
#To be executed in each script/job:
function Send-Text($Text='Sample Text', $Port=2500) {
    $endpoint = New-Object System.Net.IPEndPoint ([IPAddress]::Loopback,$Port)
    $udpclient= New-Object System.Net.Sockets.UdpClient
    $bytes=[Text.Encoding]::ASCII.GetBytes($Text)
    $bytesSent=$udpclient.Send($bytes,$bytes.length,$endpoint)
    $udpclient.Close()
}

function Start-Listen($Port=2500) {
    $endpoint = New-Object System.Net.IPEndPoint ([IPAddress]::Any,$Port)
    $udpclient= New-Object System.Net.Sockets.UdpClient $Port
    $content=$udpclient.Receive([ref]$endpoint)
    [Text.Encoding]::ASCII.GetString($content)
} 

#To determine if Port is free:
[System.Net.NetworkInformation.IPGlobalProperties]::GetIPGlobalProperties() | %{$_.GetActiveTcpListeners() } | 
  Select Port -Unique | Where {$_.Port -eq your_port}
#or
function IsLocalPortListening([int16] $LPort)
{
 <#
 .SYNOPSIS
 Method to check if local port is available. This is used to determine free
    port for deployment.
 #>
    Try 
    {
        $connection = (New-Object Net.Sockets.TcpClient)
        $connection.Connect("127.0.0.1",$LPort)
        $connection.Close()
        $connection = $null
        return "Listening"
    } Catch {
        $connection.Close()
        $connection = $null
        return "Not listening"
    }
}
   
Gotcha: This approach is unfit for my use-case since I would need another delivery mechanism to establish Port to communicate on.

Conclusion:

Given the task of communicating results from background jobs to main script, "writing to files" approach worked the best. Little overhead, single mechanism, common cmdlets and so on.
Events approach appear to work most smoothly but for the fact that I was not able to get job to process event from main script code. Since there were no subscribers to the event, I am also concerned about the quantity of Events generated (2/sec).
Named pipes proved too much for me and I never got them to work as expected while UDP/IP would require another delivery mechanism to sort out initial Port settings.


Happy coding!

No comments:

Post a Comment