Tek-Tips is the largest IT community on the Internet today!

Members share and learn making Tek-Tips Forums the best source of peer-reviewed technical information on the Internet!

  • Congratulations gkittelson on being selected by the Tek-Tips community for having the most helpful posts in the forums last week. Way to Go!

Multi-Processing Support 2

Status
Not open for further replies.

Glenn9999

Programmer
Jun 19, 2004
2,312
US
One thing I've been able to play with as of late is multi-processing support, or taking into account the presence of multiple CPUs. While I can't say much as it relates to "best practices" to make efficient use of multi-processing, I can describe some basics related to what you would need to do to take it into account. So call this a "preliminary FAQ" or a discussion or what you will, I thought I would share these things. Maybe something more can be shaped in terms of specifics to be able to make it a FAQ here.

Something that supports multi-processing generally implies supporting multi-threading. Multi-threaded apps are beyond the scope of what I intend to cover.

Multi-processing options are expressed in the OS through the setting of affinities. You can do this through task-manager to indicate which CPUs you want an application to run on. Task manager controls things at the process level.
You can also do this at the code level, as seen in the unit example. If you have access to a multi-core system running an NT based OS (under 9X the only valid affinity is $01), you will see that the OS defaults to using any of the CPUs present.

At the code level, an affinity tag is a DWord which represents a bit array, indicating which CPUs you want. You will see those tagged within the code below. A 0 bit indicates the CPU in that position is not to be used, and a 1 bit indicates the CPU is to be used. As described above, any of the valid CPUs can be tagged. We can find that out by getting an affinity mask for the process. As well, this API function will return the allowable system affinity mask - meaning the system will only support a variation of the mask that is returned. In other words, this allowable system mask represents the number of CPUs present. For convienence sake, a function is provided in the sample that will provide a quantitative value representing the number of CPUs on the system (i.e. 4 instead of $1111).

Setting of the process affinity mask is illustrated in the code. This means presenting the process handle, along with an affinity mask. Using the defined constants will be more descriptive. This sets the current process to run on either CPU3 or CPU4.

Code:
SetProcInfo(CPUID[3] or CPUID[4]);

Affinities can also be set on specific threads. Setting this does not relate much to the main thread, since it is connected to the process. But if you create threads, then you can set them to run on a specific CPU. Valid thread affinities are a subset of the process affinity. In other words, I can not create a thread to run on CPU 3 in a process which is only set to run on CPU 1 or CPU 2. This sample sets a created TThread descendent named MyThread to run on CPU 2:

Code:
SetThreadInfo(mythread.handle, CPUID[2]);

Code Sample below:
Code:
unit multiproc;
interface
  uses windows;
  { presents code to manipulate affinity masks for current processes or current threads, developed in Delphi 3 }
  const
    CPUID: Array[1..32] of Longint =
    ($01, $02, $04, $08, $10, $20, $40, $80, $100, $200, $400, $800,
     $1000, $2000, $4000, $8000, $10000, $20000, $40000, $80000,
     $100000, $200000, $400000, $800000, $1000000, $2000000, $4000000,
     $8000000, $10000000, $20000000, $40000000, $80000000);

  procedure getprocinfo(var procaffmask, sysaffmask: DWord);
  function setprocinfo(procmask: DWord): boolean;
  function setthreadinfo(handle: THandle; value: DWord): boolean;
  function GetNumberOfProcessors : Integer;

implementation

  procedure getprocinfo(var procaffmask, sysaffmask: DWord);
  // get process affinity mask and allowable system mask for current process
    begin
      GetProcessAffinityMask(GetCurrentProcess, procaffmask, sysaffmask);
    end;

  function setprocinfo(procmask: DWord): boolean;
  // set process affinity mask for current process, boolean indicates success
    begin
      Result := SetProcessAffinityMask(GetCurrentProcess, procmask);
    end;

  function setthreadinfo(handle: THandle; value: DWord): boolean;
  // set thread affinity mask for current process.  Boolean indicates success
    begin
      Result := (SetThreadAffinityMask(handle, value) = 0);
    end;

  function GetNumberOfProcessors : Integer;
  // returns the number of processors in the system
    var
      Info : TSystemInfo;
    begin
      GetSystemInfo(Info);
      result := Info.dwNumberOfProcessors;
    end;

end.

Measurement is not management.
 
Best Practices, or some observations I've already made. If anyone has any observations, please share. I'm still trying to learn how best to use this knowledge in coding.

1) Single-threaded apps tend to "float around" on the CPUs. My guess is that the OS tends to spread the load evenly and allow left-over CPU for other processes. A single process seems to use about 1/2 on a 2-CPU system. This seems to represent the main advantage for upgrading to a multi-core system in using single-threaded apps.

If you switch the process to use a single CPU, it will use 100% of that CPU, and I notice no difference in times.

2) Non multi-processing aware multi-threading apps tend to start out using all the CPUs to full capacity but then get throttled down again to about 50% on a 2-CPU system. Using more threads tends to place more of a processing load on the system and represents a diminishing return, since you give the OS more threads to switch between.

3) Specifically assigning single-threads to single CPUs tend to provide the most sustained access to all the CPU resources, but again it depends on how long each of the threads last and how well you can split up the work.

4) I don't know where the return is in terms of amount of work. Obviously if a complete single-threaded process takes 1 or 2 seconds, it probably doesn't provide much of a return to divide up the work and multi-thread it. And I don't know how much work in a specific process would make it worthwhile to not split up the work and keep it to a single process.

Generally, this is the disadvantage that comes with multi-processing. Few algorithms tend to suit multi-processing, because they are mostly linear and dependent. So they couldn't be split up. This seems to be the reason that they are only seen in image processing, databases, and other computationally-intensive situations that are not linear/dependent.

Hopefully this will be a useful discussion.

Measurement is not management.
 
Most of my applications are windows services, and they are ALL multithreaded. I see real world benefits there (on SMP systems).
I must admit that I use less threading in my win32 GUI apps. for these kind of apps, it depends on the kind of work they must do and the level of UI "smoothness" needed.
I never play with affinities, the OS should take care of that, not the application.

Cheers,
Daddy

-----------------------------------------------------
What You See Is What You Get
Never underestimate tha powah of tha google!
 
very good post Glen, i shall look into this more at somepoint.
 
I see real world benefits there (on SMP systems).

I'm seeing benefits on the algorithm I'm testing with this (Quicksort) using multiple threads. Most certainly. As point 4 indicates, doing quicksort on a table with a large number of elements is providing a definite benefit.

I never play with affinities, the OS should take care of that, not the application.

I thought that, originally, but I notice about a 3-4 second lower time on average for assigning threads to a CPU when I was testing just making threads versus tieing each thread to a CPU.

Since time in each thread is variable with quicksort, you just gave me the idea to find a job that can be evenly divided between threads. Trying that should indicate whether there is a difference between just simply multi-threading or if assigning each thread to a CPU. Even though, it seems like another pretty good learning exercise on this topic.

Measurement is not management.
 
multithreading is not only about performance (ie doing a task quicker by spreading it over multiple CPU's (like the quicksort)).
It's also about doing tasks that would otherwise block the GUI. that's what I mean with smoothness.

I made this little unit that creates a background thread where I can push tasks into the background (like updating a database table).
This unit is linked to other self made units so you can't use this code directly, but it will give you an idea:

Code:
unit u_class_backgroundthread;

interface

uses
  // own units
  u_class_debugger, u_class_sqlinterface,
  // delphi units
  Windows, Classes, SysUtils, Contnrs;

type
  // Helper class to make TObjectQueue thread safe
  TSafeObjectQueue = class(TObject)
  private
    ObjectQueue : TObjectQueue;
    FSection    : TRTLCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    function Push(AObject: TObject): TObject;
    function Pop: TObject;
    function Peek: TObject;
    function Count : Integer;
  end;

  TObjectProc = procedure of object;
  // class to define work objects, these will do work that does not need interaction with the main thread
  TWorkObject = class(TObject)
  private
//    FWorkType      : TWorkType;
    FWorkData      : string;
    FNotifyEvent   : TNotifyEvent;
    FExecutionTime : TDateTime;
    FObject        : TObject;
    FObjectProc    : TObjectProc;
  public
    constructor Create(AObject : TObject; WorkData : string; ExecutionTime : TDateTime); overload;
    constructor Create(AObject : TObject; WorkData : string); overload;
    constructor Create(AObjectProc : TObjectProc); overload;
    constructor Create(AObjectProc : TObjectProc; ExecutionTime : TDateTime); overload;
  published
    property WorkObject : TObject read FObject write FObject;
    property WorkCode : TObjectProc read FObjectProc write FObjectProc;
    property WorkData : string read FWorkData write FWorkData;
    property OnWorkDone : TNotifyEvent read FNotifyEvent write FNotifyEvent;
    property ExecutionTime : TDateTime read FExecutionTime write FExecutionTime;
  end;

  // workobject do to sql updates in background
  TSQLWorkObject = class(TWorkObject)
    FActivateSQL  : Boolean;
  private
    function GetSQLInterface: TSQLInterface;
  published
  public
    property SQLInterface : TSQLInterface read GetSQLInterface;
    constructor Create(SQLInterface : TSQLInterface; SQLQuery : string; Activate : Boolean); overload;
  end;

  // workobject do execute object methods in background
  // method must be threadsafe!!! (so no UI)
  TMethodWorkObject = class(TWorkObject)
  public
    constructor Create(const WorkMethod : TObjectProc; WorkNotify : TNotifyEvent); overload;
    constructor Create(const WorkMethod : TObjectProc; WorkNotify : TNotifyEvent; ExecutionTime : TDateTime); overload;
  end;

  TNotifyObject = class(TObject)
    NotifyEvent : TNotifyEvent;
  end;

  TBackgroundThread = class(TThread)
  private
    Debug               : TDebuggerSlot;
    FSection            : TRTLCriticalSection;
    FSpeed              : Integer;
    NotifyObjects        : TSafeObjectQueue;
    WorkObjects         : TSafeObjectQueue;
    procedure Output(Routine: string; Str : string);
    procedure SyncNotify;
    procedure DoNotify;
    procedure DoWork;
    procedure DoWorkSQLUpdate(WorkObject : TSQLWorkObject);
    procedure DoWorkMethod(WorkObject : TMethodWorkObject);
    procedure FreeWorkObjects;
    procedure FreeNotifyObjects;
  protected
    procedure Execute; override;
  public
    ThreadDone : THandle;
    constructor CreateMe(ADebug : TDebuggerSlot; Speed : Integer);
    destructor Destroy; override;
    procedure AddWork(WorkObject : TWorkObject);
    procedure AddNotify(NotifyEvent : TNotifyEvent);
  end;

implementation

{ TSafeObjectQueue }

function TSafeObjectQueue.Count: Integer;
begin
 EnterCriticalSection(FSection);
 try
  Result := ObjectQueue.Count;
 finally
  LeaveCriticalSection(FSection);
 end;
end;

function TSafeObjectQueue.Peek: TObject;
begin
 EnterCriticalSection(FSection);
 try
  Result := ObjectQueue.Peek;
 finally
  LeaveCriticalSection(FSection);
 end;
end;

function TSafeObjectQueue.Pop: TObject;
begin
 EnterCriticalSection(FSection);
 try
  Result := ObjectQueue.Pop;
 finally
  LeaveCriticalSection(FSection);
 end;
end;

function TSafeObjectQueue.Push(AObject: TObject): TObject;
begin
 EnterCriticalSection(FSection);
 try
  Result := ObjectQueue.Push(AObject);
 finally
  LeaveCriticalSection(FSection);
 end;
end;

constructor TSafeObjectQueue.Create;
begin
 FillChar(FSection, SizeOf(FSection), 0);
 InitializeCriticalSection(FSection);
 ObjectQueue := TObjectQueue.Create;
end;

destructor TSafeObjectQueue.Destroy;
begin
 FreeAndNil(ObjectQueue);
 DeleteCriticalSection(FSection);
 inherited;
end;

{ TWorkObject }

constructor TWorkObject.Create(AObject : TObject; WorkData: string; ExecutionTime : TDateTime);
begin
 FWorkData := WorkData;
// FWorkType := WorkType;
 FExecutionTime := ExecutionTime;
 FObject := AObject;
end;

constructor TWorkObject.Create(AObject : TObject; WorkData: string);
begin
 Create(AObject, WorkData, 0);
end;

constructor TWorkObject.Create(AObjectProc: TObjectProc);
begin
 Create(nil, '');
 FObjectProc := AObjectProc;
end;

constructor TWorkObject.Create(AObjectProc: TObjectProc; ExecutionTime: TDateTime);
begin
 Create(nil, '', ExecutionTime);
 FObjectProc := AObjectProc;
end;

{ TSQLWorkObject }

constructor TSQLWorkObject.Create(SQLInterface: TSQLInterface; SQLQuery: string; Activate: Boolean);
begin
 FActivateSQL := Activate;
 Create(SQLInterface, SQLQuery, 0);
end;

function TSQLWorkObject.GetSQLInterface: TSQLInterface;
begin
 Result := TSQLInterface(WorkObject);
end;

{ TMethodWorkObject }

constructor TMethodWorkObject.Create(const WorkMethod: TObjectProc; WorkNotify: TNotifyEvent);
begin
 Create(WorkMethod);
 Self.OnWorkDone := WorkNotify;
end;

constructor TMethodWorkObject.Create(const WorkMethod: TObjectProc; WorkNotify: TNotifyEvent; ExecutionTime: TDateTime);
begin
 Create(WorkMethod, ExecutionTime);
 Self.OnWorkDone := WorkNotify;
end;

{ TBackgroundThread }

procedure TBackgroundThread.Output(Routine: string; Str: string);
begin
 if Assigned(Debug) then
  Debug.Output(Self, Routine, Str);
end;

procedure TBackgroundThread.DoWorkSQLUpdate(WorkObject: TSQLWorkObject);
begin
 // execute sql update (current thread context)
 try
  if WorkObject.FActivateSQL then
   WorkObject.SQLInterface.Active := True;
  WorkObject.SQLInterface.UpdateQuery(WorkObject.WorkData);
 finally
  if WorkObject.FActivateSQL then
   WorkObject.SQLInterface.Active := False;
 end;
 // inform caller (main thread context)
 if Assigned(WorkObject.OnWorkDone) then
  AddNotify(WorkObject.OnWorkDone);
end;

procedure TBackgroundThread.DoWorkMethod(WorkObject: TMethodWorkObject);
begin
 // execute method (current thread context)
 WorkObject.FObjectProc;
 // inform caller (main thread context)
 if Assigned(WorkObject.OnWorkDone) then
  AddNotify(WorkObject.OnWorkDone);
end;

procedure TBackgroundThread.SyncNotify;
begin
 // execute notifyevent (main thread context)
 if NotifyObjects.Count > 0 then
  Synchronize(DoNotify);
end;

procedure TBackgroundThread.DoNotify;

var NotifyObject : TNotifyObject;


begin
 if NotifyObjects.Count > 0 then
  begin
   if Terminated then Exit;
   NotifyObject := TNotifyObject(NotifyObjects.Pop);
   try
    NotifyObject.NotifyEvent(nil);
   except
    on E: Exception do
     Debug.ExceptionOutput(Self, 'DoNotify', E);
   end;
   FreeAndNil(NotifyObject);
  end;
end;

procedure TBackgroundThread.DoWork;

var WorkObject : TWorkObject;

begin
 // all objects are arranged as a FIFO
 if WorkObjects.Count > 0 then
  begin
   if Terminated then Exit;
   WorkObject := TWorkObject(WorkObjects.Pop);
   if WorkObject.ExecutionTime > 0 then
    begin
     // it is no time yet to execute this object, put it back as last object
     if WorkObject.ExecutionTime > Now then
      begin
       WorkObjects.Push(WorkObject);
       Exit;
      end;
    end;
   try
    if WorkObject is TSQLWorkObject then
     DoWorkSQLUpdate(TSQLWorkObject(WorkObject)) else
    if WorkObject is TMethodWorkObject then
     DoWorkMethod(TMethodWorkObject(WorkObject));
   except
    on E: Exception do
     Debug.ExceptionOutput(Self, 'DoWork', E);
   end;
   FreeAndNil(WorkObject);
  end;
end;

procedure TBackgroundThread.FreeNotifyObjects;

var NotifyObject : TNotifyObject;

begin
 while NotifyObjects.Count > 0 do begin
  NotifyObject := TNotifyObject(NotifyObjects.Pop);
  FreeAndNil(NotifyObject);
 end;
 FreeAndNil(NotifyObjects);
end;

procedure TBackgroundThread.FreeWorkObjects;

var WorkObject : TWorkObject;

begin
 while WorkObjects.Count > 0 do begin
  WorkObject := TWorkObject(WorkObjects.Pop);
  FreeAndNil(WorkObject);
 end;
 FreeAndNil(WorkObjects);
end;

procedure TBackgroundThread.AddNotify(NotifyEvent: TNotifyEvent);

var NotifyObject : TNotifyObject;

begin
 if Assigned(NotifyEvent) then
  begin
   NotifyObject := TNotifyObject.Create;
   NotifyObject.NotifyEvent := NotifyEvent;
   NotifyObjects.Push(NotifyObject);
  end;
end;

procedure TBackgroundThread.AddWork(WorkObject: TWorkObject);
begin
 WorkObjects.Push(WorkObject);
end;

// main thread loop
procedure TBackgroundThread.Execute;
begin
 Output('Execute', 'Start of thread execution');
 while not Terminated do
  begin
   Sleep(FSpeed);
   DoWork;
   SyncNotify;
  end;
 Output('Execute', 'End of thread execution');
 SetEvent(ThreadDone);
end;

constructor TBackgroundThread.CreateMe(ADebug : TDebuggerSlot; Speed : Integer);
begin
 Create(True);
 FreeOnTerminate := False;
 // create objects
 Debug := ADebug;
 FSpeed := Speed;
 ThreadDone := CreateEvent(nil, True, False, nil);
 Output('Create','creating objects');
 FillChar(FSection, SizeOf(FSection), 0);
 InitializeCriticalSection(FSection);
 NotifyObjects := TSafeObjectQueue.Create;
 WorkObjects := TSafeObjectQueue.Create;
 Output('Create','create complete');
end;

destructor TBackgroundThread.Destroy;
begin
 // destroy objects
 Output('Destroy','destroying objects');
 FreeWorkObjects;
 FreeNotifyObjects;
 DeleteCriticalSection(FSection);
 CloseHandle(ThreadDone);
 Output('Destroy','destroy complete');
 inherited;
end;

end.

Cheers,
Daddy

-----------------------------------------------------
What You See Is What You Get
Never underestimate tha powah of tha google!
 
The article is Windows 7 centric, though. Is it really wise to program anything based on the behavior of an OS that hasn't been RTM released yet?

Measurement is not management.
 
oh, it is sure RTM!
Running it on my development box :)

/Daddy

-----------------------------------------------------
What You See Is What You Get
Never underestimate tha powah of tha google!
 
I know this is a little old but I have been reading up a lot more on threading lately and it led me to this.
I just wanted to say that you guys inspire me to be a more knowledgeable and competent coder. =) No matter how much I learn you guys still make me feel like a noob. It is quite humbling.

~
“Your request is not unlike your lower intestine: stinky, and loaded with danger.” — Ace Ventura.
 
Status
Not open for further replies.

Part and Inventory Search

Sponsor

Back
Top